diff options
author | Vitaly Isaev <vitalyisaev@ydb.tech> | 2023-12-20 19:36:40 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-20 19:36:40 +0300 |
commit | d7d4cd24e87687aa47f2c0cd9d0e223346b12476 (patch) | |
tree | 5ca9b5334b2bc0ccc629899b13fc30d5aa4ad9e3 | |
parent | 324235089a2fb295bc5eefb1f2c45de28d783163 (diff) | |
download | ydb-d7d4cd24e87687aa47f2c0cd9d0e223346b12476.tar.gz |
YQ-2323: YQ Connector: use Connector docker image for YQL Generic provider integration tests (#604)
* YQ Connector: use Connector docker image instead of building it from scratch
* YQ Connector: tune performance of Generic provider integration tests
12 files changed, 155 insertions, 86 deletions
diff --git a/ydb/library/yql/providers/generic/connector/tests/clickhouse.py b/ydb/library/yql/providers/generic/connector/tests/clickhouse.py index 6d1261559d..73aa960fb1 100644 --- a/ydb/library/yql/providers/generic/connector/tests/clickhouse.py +++ b/ydb/library/yql/providers/generic/connector/tests/clickhouse.py @@ -9,6 +9,7 @@ from utils.database import Database from utils.log import make_logger from utils.schema import Schema from utils.settings import Settings +from utils.sql import format_values_for_bulk_sql_insert import test_cases.select_missing_database import test_cases.select_missing_table @@ -25,7 +26,7 @@ def prepare_table( schema: Schema, data_in: Sequence, ): - dbTable = f'{database.name}.{table_name}' + dbTable = f"{database.name}.{table_name}" # create database create_database_stmt = database.create(data_source_pb2.CLICKHOUSE) @@ -43,26 +44,17 @@ def prepare_table( return # create table - create_table_stmt = f'CREATE TABLE {dbTable} ({schema.yql_column_list(data_source_pb2.CLICKHOUSE)}) ENGINE = Memory' + create_table_stmt = f"CREATE TABLE {dbTable} ({schema.yql_column_list(data_source_pb2.CLICKHOUSE)}) ENGINE = Memory" LOGGER.debug(create_table_stmt) client.command(create_table_stmt) # write data - for row in data_in: - # prepare string with serialized data - values_dump = [] - for val in row: - if isinstance(val, str): - values_dump.append(f"'{val}'") - elif val is None: - values_dump.append('NULL') - else: - values_dump.append(str(val)) - values = ", ".join(values_dump) - - insert_stmt = f"INSERT INTO {dbTable} (*) VALUES ({values})" - LOGGER.debug(insert_stmt) - client.command(insert_stmt) + values = format_values_for_bulk_sql_insert(data_in) + insert_stmt = f"INSERT INTO {dbTable} (*) VALUES {values}" + # TODO: these logs may be too big when working with big tables, + # dump insert statement via yatest into file. + LOGGER.debug(insert_stmt) + client.command(insert_stmt) def select_positive( @@ -82,21 +74,25 @@ def select_positive( # NOTE: to assert equivalence we have to add explicit ORDER BY, # because Clickhouse's output will be randomly ordered otherwise. - where_statement = '' + where_statement = "" if test_case.select_where is not None: - where_statement = f'WHERE {test_case.select_where.filter_expression}' - order_by_expression = '' + where_statement = f"WHERE {test_case.select_where.filter_expression}" + order_by_expression = "" order_by_column_name = test_case.select_what.order_by_column_name if order_by_column_name: - order_by_expression = f'ORDER BY {order_by_column_name}' - yql_script = f''' + order_by_expression = f"ORDER BY {order_by_column_name}" + yql_script = f""" {test_case.pragmas_sql_string} SELECT {test_case.select_what.yql_select_names} FROM {settings.clickhouse.cluster_name}.{test_case.qualified_table_name} {where_statement} {order_by_expression} - ''' - result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings) + """ + result = dqrun_runner.run( + test_dir=tmp_path, + script=yql_script, + generic_settings=test_case.generic_settings, + ) assert result.returncode == 0, result.stderr @@ -115,11 +111,15 @@ def select_missing_database( test_case: test_cases.select_missing_database.TestCase, ): # select table from the database that does not exist - yql_script = f''' + yql_script = f""" SELECT * FROM {settings.clickhouse.cluster_name}.{test_case.qualified_table_name} - ''' - result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings) + """ + result = dqrun_runner.run( + test_dir=tmp_path, + script=yql_script, + generic_settings=test_case.generic_settings, + ) assert test_case.database.missing_database_msg(data_source_pb2.CLICKHOUSE) in result.stderr, result.stderr @@ -136,10 +136,14 @@ def select_missing_table( LOGGER.debug(create_database_stmt) client.command(create_database_stmt) - yql_script = f''' + yql_script = f""" SELECT * FROM {settings.clickhouse.cluster_name}.{test_case.qualified_table_name} - ''' - result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings) + """ + result = dqrun_runner.run( + test_dir=tmp_path, + script=yql_script, + generic_settings=test_case.generic_settings, + ) assert test_case.database.missing_table_msg(data_source_pb2.CLICKHOUSE) in result.stderr, result.stderr diff --git a/ydb/library/yql/providers/generic/connector/tests/docker-compose.yml b/ydb/library/yql/providers/generic/connector/tests/docker-compose.yml index ca6d386069..2991e0e883 100644 --- a/ydb/library/yql/providers/generic/connector/tests/docker-compose.yml +++ b/ydb/library/yql/providers/generic/connector/tests/docker-compose.yml @@ -1,20 +1,25 @@ version: '3.4' services: - postgres: - image: "postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085" + postgresql: + image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085 environment: POSTGRES_DB: db POSTGRES_USER: user POSTGRES_PASSWORD: password ports: - - "15432:5432" + - 15432:5432 clickhouse: - image: "clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06" + image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06 environment: CLICKHOUSE_DB: db CLICKHOUSE_USER: user CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1 CLICKHOUSE_PASSWORD: password ports: - - "19000:9000" - - "18123:8123" + - 19000:9000 + - 18123:8123 + fq-connector-go: + image: ghcr.io/ydb-platform/fq-connector-go:v0.0.6-rc.8@sha256:74ebae0530d916c1842a7fddfbddc6c018763a0401f2f627a44e8829692fe41f + ports: + - 50051:50051 + network_mode: host diff --git a/ydb/library/yql/providers/generic/connector/tests/postgresql.py b/ydb/library/yql/providers/generic/connector/tests/postgresql.py index 8dd1984d8d..91848b7b2a 100644 --- a/ydb/library/yql/providers/generic/connector/tests/postgresql.py +++ b/ydb/library/yql/providers/generic/connector/tests/postgresql.py @@ -9,6 +9,7 @@ from utils.log import make_logger from utils.postgresql import Client from utils.schema import Schema from utils.settings import Settings +from utils.sql import format_values_for_bulk_sql_insert import test_cases.select_missing_database import test_cases.select_missing_table @@ -28,7 +29,7 @@ def prepare_table( pg_schema: str = None, ): # create database - with client.get_cursor('postgres') as (conn, cur): + with client.get_cursor("postgres") as (conn, cur): database_exists_stmt = database.exists(data_source_pb2.POSTGRESQL) LOGGER.debug(database_exists_stmt) cur.execute(database_exists_stmt) @@ -62,15 +63,19 @@ def prepare_table( create_schema_stmt = f"CREATE SCHEMA IF NOT EXISTS {pg_schema}" LOGGER.debug(create_schema_stmt) cur.execute(create_schema_stmt) - table_name = f'{pg_schema}.{table_name}' + table_name = f"{pg_schema}.{table_name}" - create_table_stmt = f'CREATE TABLE {table_name} ({schema.yql_column_list(data_source_pb2.POSTGRESQL)})' + create_table_stmt = f"CREATE TABLE {table_name} ({schema.yql_column_list(data_source_pb2.POSTGRESQL)})" LOGGER.debug(create_table_stmt) cur.execute(create_table_stmt) - insert_stmt = f'INSERT INTO {table_name} ({schema.columns.names_with_commas}) VALUES ({", ".join(["%s"] * len(data_in[0]))})' + values = format_values_for_bulk_sql_insert(data_in) + + insert_stmt = f"INSERT INTO {table_name} ({schema.columns.names_with_commas}) VALUES {values}" + # TODO: these logs may be too big when working with big tables, + # dump insert statement via yatest into file. LOGGER.debug(insert_stmt) - cur.executemany(insert_stmt, data_in) + cur.execute(insert_stmt) conn.commit() cur.close() @@ -92,17 +97,21 @@ def select_positive( ) # read data - where_statement = '' + where_statement = "" if test_case.select_where is not None: - where_statement = f'WHERE {test_case.select_where.filter_expression}' - yql_script = f''' + where_statement = f"WHERE {test_case.select_where.filter_expression}" + yql_script = f""" {test_case.pragmas_sql_string} SELECT {test_case.select_what.yql_select_names} FROM {settings.postgresql.cluster_name}.{test_case.qualified_table_name} {where_statement} - ''' + """ LOGGER.debug(yql_script) - result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings) + result = dqrun_runner.run( + test_dir=tmp_path, + script=yql_script, + generic_settings=test_case.generic_settings, + ) assert result.returncode == 0, result.stderr @@ -122,12 +131,16 @@ def select_missing_database( ): # select table from database that does not exist - yql_script = f''' + yql_script = f""" SELECT * FROM {settings.postgresql.cluster_name}.{test_case.qualified_table_name} - ''' + """ LOGGER.debug(yql_script) - result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings) + result = dqrun_runner.run( + test_dir=tmp_path, + script=yql_script, + generic_settings=test_case.generic_settings, + ) assert test_case.database.missing_database_msg(data_source_pb2.POSTGRESQL) in result.stderr, result.stderr @@ -140,7 +153,7 @@ def select_missing_table( test_case: test_cases.select_missing_table.TestCase, ): # create database but don't create table - with client.get_cursor('postgres') as (conn, cur): + with client.get_cursor("postgres") as (conn, cur): database_exists_stmt = test_case.database.exists(data_source_pb2.POSTGRESQL) LOGGER.debug(database_exists_stmt) cur.execute(database_exists_stmt) @@ -155,12 +168,16 @@ def select_missing_table( cur.close() # read data - yql_script = f''' + yql_script = f""" SELECT * FROM {settings.postgresql.cluster_name}.{test_case.qualified_table_name} - ''' + """ LOGGER.debug(yql_script) - result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings) + result = dqrun_runner.run( + test_dir=tmp_path, + script=yql_script, + generic_settings=test_case.generic_settings, + ) assert test_case.database.missing_table_msg(data_source_pb2.POSTGRESQL) in result.stderr, result.stderr @@ -182,12 +199,16 @@ def select_pg_schema( ) # read data - yql_script = f''' + yql_script = f""" SELECT {test_case.select_what.yql_select_names} FROM {settings.postgresql.cluster_name}.{test_case.qualified_table_name} - ''' + """ LOGGER.debug(yql_script) - result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings) + result = dqrun_runner.run( + test_dir=tmp_path, + script=yql_script, + generic_settings=test_case.generic_settings, + ) assert result.returncode == 0, result.stderr diff --git a/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_clickhouse.py b/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_clickhouse.py index e154eb82bf..f66cc98834 100644 --- a/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_clickhouse.py +++ b/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_clickhouse.py @@ -384,7 +384,7 @@ class Factory: return [ TestCase( - name=f'pushdown_{data_source_kind}', + name=f'pushdown_{EDataSourceKind.Name(data_source_kind)}', data_in=data_in, data_out_=data_out, pragmas=dict({'generic.UsePredicatePushdown': 'true'}), @@ -412,7 +412,7 @@ class Factory: for base_tc in base_test_cases: for protocol in protocols: tc = replace(base_tc) - tc.name += f'_{protocol}' + tc.name += f'_{EProtocol.Name(protocol)}' tc.protocol = protocol test_cases.append(tc) return test_cases diff --git a/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py b/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py index b52608d88c..05b732ebc8 100644 --- a/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py +++ b/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py @@ -216,7 +216,9 @@ class Factory: # TODO: assert connector stats when it will be accessible ''' - table_size = 2.5 * self.ss.connector.paging_bytes_per_page * self.ss.connector.paging_prefetch_queue_capacity + # FIXME: uncomment to debug YQ-2729 + # table_size = 2.5 * self.ss.connector.paging_bytes_per_page * self.ss.connector.paging_prefetch_queue_capacity + table_size = self.ss.connector.paging_bytes_per_page * self.ss.connector.paging_prefetch_queue_capacity / 1000 schema = Schema( columns=ColumnList( @@ -239,7 +241,7 @@ class Factory: test_cases = [] for data_source_kind in data_source_kinds: tc = TestCase( - name=f'large_table_{data_source_kind}', + name=f'large_table', data_source_kind=data_source_kind, data_in=data_in, data_out_=data_in, @@ -273,7 +275,7 @@ class Factory: continue for protocol in protocols[base_tc.data_source_kind]: tc = replace(base_tc) - tc.name += f'_{protocol}' + tc.name += f'_{EProtocol.Name(protocol)}' tc.protocol = protocol test_cases.append(tc) diff --git a/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_postgresql.py b/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_postgresql.py index ddc1a4c972..772c818116 100644 --- a/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_postgresql.py +++ b/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_postgresql.py @@ -429,7 +429,7 @@ class Factory: return [ TestCase( - name=f'pushdown_{data_source_kind}', + name=f'pushdown_{EDataSourceKind.Name(data_source_kind)}', data_in=data_in, data_out_=data_out_1, pragmas=dict({'generic.UsePredicatePushdown': 'true'}), @@ -440,7 +440,7 @@ class Factory: database=Database.make_for_data_source_kind(data_source_kind), ), TestCase( - name=f'pushdown_{data_source_kind}', + name=f'pushdown_{EDataSourceKind.Name(data_source_kind)}', data_in=data_in, data_out_=data_out_2, pragmas=dict({'generic.UsePredicatePushdown': 'true'}), @@ -469,7 +469,7 @@ class Factory: for base_tc in base_test_cases: for protocol in protocols: tc = replace(base_tc) - tc.name += f'_{protocol}' + tc.name += f'_{EProtocol.Name(protocol)}' tc.protocol = protocol test_cases.append(tc) return test_cases diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/dqrun.py b/ydb/library/yql/providers/generic/connector/tests/utils/dqrun.py index 7c98d24a0b..d986631dcd 100644 --- a/ydb/library/yql/providers/generic/connector/tests/utils/dqrun.py +++ b/ydb/library/yql/providers/generic/connector/tests/utils/dqrun.py @@ -1,6 +1,7 @@ from pathlib import Path import subprocess from typing import Final +import json import jinja2 @@ -213,6 +214,7 @@ class DqRunner(Runner): data_out = None data_out_with_types = None schema = None + unique_suffix = test_dir.name if out.returncode == 0: # Parse output @@ -236,7 +238,6 @@ class DqRunner(Runner): for line in out.stderr.decode('utf-8').splitlines(): LOGGER.error(line) - unique_suffix = test_dir.name err_file = yatest.common.output_path(f'dqrun-{unique_suffix}.err') with open(err_file, "w") as f: f.write(out.stderr.decode('utf-8')) diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/kqprun.py b/ydb/library/yql/providers/generic/connector/tests/utils/kqprun.py index ae43e7d545..d37c861504 100644 --- a/ydb/library/yql/providers/generic/connector/tests/utils/kqprun.py +++ b/ydb/library/yql/providers/generic/connector/tests/utils/kqprun.py @@ -180,6 +180,7 @@ class KqpRunner(Runner): data_out = None data_out_with_types = None schema = None + unique_suffix = test_dir.name if out.returncode == 0: # Parse output @@ -206,6 +207,7 @@ class KqpRunner(Runner): LOGGER.debug('Schema: %s', schema) LOGGER.debug('Data out: %s', data_out) LOGGER.debug('Data out with types: %s', data_out_with_types) + else: LOGGER.error('STDOUT: ') for line in out.stdout.decode('utf-8').splitlines(): @@ -214,7 +216,6 @@ class KqpRunner(Runner): for line in out.stderr.decode('utf-8').splitlines(): LOGGER.error(line) - unique_suffix = test_dir.name err_file = yatest.common.output_path(f'kqprun-{unique_suffix}.err') with open(err_file, "w") as f: f.write(out.stderr.decode('utf-8')) diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/settings.py b/ydb/library/yql/providers/generic/connector/tests/utils/settings.py index 8cb049c2e7..a604d20728 100644 --- a/ydb/library/yql/providers/generic/connector/tests/utils/settings.py +++ b/ydb/library/yql/providers/generic/connector/tests/utils/settings.py @@ -44,10 +44,10 @@ class Settings: def from_env(cls) -> 'Settings': return cls( connector=cls.Connector( - grpc_host=environ['YDB_CONNECTOR_RECIPE_GRPC_HOST'], - grpc_port=int(environ['YDB_CONNECTOR_RECIPE_GRPC_PORT']), - paging_bytes_per_page=int(environ['YDB_CONNECTOR_RECIPE_GRPC_PAGING_BYTES_PER_PAGE']), - paging_prefetch_queue_capacity=int(environ['YDB_CONNECTOR_RECIPE_GRPC_PAGING_PREFETCH_QUEUE_CAPACITY']), + grpc_host='localhost', + grpc_port=50051, + paging_bytes_per_page=4*1024*1024, + paging_prefetch_queue_capacity=2, ), clickhouse=cls.ClickHouse( cluster_name='clickhouse_integration_test', @@ -75,7 +75,7 @@ class Settings: case EDataSourceKind.POSTGRESQL: return self.postgresql.cluster_name case _: - raise Exception(f'invalid data source: {data_source_kind}') + raise Exception(f'invalid data source: {EDataSourceKind.Name(data_source_kind)}') @dataclass diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/sql.py b/ydb/library/yql/providers/generic/connector/tests/utils/sql.py new file mode 100644 index 0000000000..9bf93e9f8b --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/tests/utils/sql.py @@ -0,0 +1,30 @@ +import datetime +from typing import Sequence + +def format_values_for_sql_insert(row: Sequence) -> str: + row_values_dump = [] + for val in row: + if isinstance(val, str): + row_values_dump.append(f"'{val}'") + elif isinstance(val, datetime.date): + row_values_dump.append(f"'{val}'") + elif isinstance(val, datetime.datetime): + row_values_dump.append(f"'{val}'") + elif val is None: + row_values_dump.append('NULL') + else: + row_values_dump.append(str(val)) + values = "(" + ", ".join(row_values_dump) + ")" + return values + +def format_values_for_bulk_sql_insert(data_in: Sequence) -> str: + """ + This function helps to build multiline INSERTs, like this: + + INSERT INTO items (col1, col2) VALUES + ('B6717', 110), + ('HG120', 111), + ('MD2L2', 112); + """ + values = map(format_values_for_sql_insert, data_in) + return ", ".join(values) diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/ya.make b/ydb/library/yql/providers/generic/connector/tests/utils/ya.make index 5e0640ecfa..5b53dcacd7 100644 --- a/ydb/library/yql/providers/generic/connector/tests/utils/ya.make +++ b/ydb/library/yql/providers/generic/connector/tests/utils/ya.make @@ -14,6 +14,7 @@ PY_SRCS( runner.py schema.py settings.py + sql.py ) PEERDIR( diff --git a/ydb/library/yql/providers/generic/connector/tests/ya.make b/ydb/library/yql/providers/generic/connector/tests/ya.make index 182e8e226d..e5918f495f 100644 --- a/ydb/library/yql/providers/generic/connector/tests/ya.make +++ b/ydb/library/yql/providers/generic/connector/tests/ya.make @@ -5,23 +5,27 @@ NO_CHECK_IMPORTS() SIZE(LARGE) -# TAG and REQUIREMENTS are copied from: https://docs.yandex-team.ru/devtools/test/environment#docker-compose -TAG( - ya:external - ya:force_sandbox - ya:fat -) - -REQUIREMENTS( - container:4467981730 - cpu:all - dns:dns64 -) +IF (AUTOCHECK) + # Split tests to chunks only when they're running on different machines with distbuild, + # otherwise this directive will slow down local test execution. + # Look through https://st.yandex-team.ru/DEVTOOLSSUPPORT-39642 for more information. + FORK_SUBTESTS() + + # TAG and REQUIREMENTS are copied from: https://docs.yandex-team.ru/devtools/test/environment#docker-compose + TAG( + ya:external + ya:force_sandbox + ya:fat + ) + + REQUIREMENTS( + container:4467981730 + cpu:all + dns:dns64 + ) +ENDIF() INCLUDE(${ARCADIA_ROOT}/library/recipes/docker_compose/recipe.inc) -INCLUDE(${ARCADIA_ROOT}/ydb/library/yql/providers/generic/connector/recipe/recipe.inc) - -FORK_SUBTESTS() TEST_SRCS( conftest.py |