diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/python/clickhouse-connect/clickhouse_connect/driver | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/python/clickhouse-connect/clickhouse_connect/driver')
25 files changed, 3784 insertions, 0 deletions
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py new file mode 100644 index 0000000000..1320c04213 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py @@ -0,0 +1,118 @@ +from inspect import signature +from typing import Optional, Union, Dict, Any +from urllib.parse import urlparse, parse_qs + +import clickhouse_connect.driver.ctypes +from clickhouse_connect.driver.client import Client +from clickhouse_connect.driver.common import dict_copy +from clickhouse_connect.driver.exceptions import ProgrammingError +from clickhouse_connect.driver.httpclient import HttpClient + + +# pylint: disable=too-many-arguments,too-many-locals,too-many-branches +def create_client(*, + host: str = None, + username: str = None, + password: str = '', + database: str = '__default__', + interface: Optional[str] = None, + port: int = 0, + secure: Union[bool, str] = False, + dsn: Optional[str] = None, + settings: Optional[Dict[str, Any]] = None, + generic_args: Optional[Dict[str, Any]] = None, + **kwargs) -> Client: + """ + The preferred method to get a ClickHouse Connect Client instance + + :param host: The hostname or IP address of the ClickHouse server. If not set, localhost will be used. + :param username: The ClickHouse username. If not set, the default ClickHouse user will be used. + :param password: The password for username. + :param database: The default database for the connection. If not set, ClickHouse Connect will use the + default database for username. + :param interface: Must be http or https. Defaults to http, or to https if port is set to 8443 or 443 + :param port: The ClickHouse HTTP or HTTPS port. If not set will default to 8123, or to 8443 if secure=True + or interface=https. + :param secure: Use https/TLS. This overrides inferred values from the interface or port arguments. + :param dsn: A string in standard DSN (Data Source Name) format. Other connection values (such as host or user) + will be extracted from this string if not set otherwise. + :param settings: ClickHouse server settings to be used with the session/every request + :param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings. + It is not recommended to use this parameter externally. + + :param kwargs -- Recognized keyword arguments (used by the HTTP client), see below + + :param compress: Enable compression for ClickHouse HTTP inserts and query results. True will select the preferred + compression method (lz4). A str of 'lz4', 'zstd', 'brotli', or 'gzip' can be used to use a specific compression type + :param query_limit: Default LIMIT on returned rows. 0 means no limit + :param connect_timeout: Timeout in seconds for the http connection + :param send_receive_timeout: Read timeout in seconds for http connection + :param client_name: client_name prepended to the HTTP User Agent header. Set this to track client queries + in the ClickHouse system.query_log. + :param send_progress: Deprecated, has no effect. Previous functionality is now automatically determined + :param verify: Verify the server certificate in secure/https mode + :param ca_cert: If verify is True, the file path to Certificate Authority root to validate ClickHouse server + certificate, in .pem format. Ignored if verify is False. This is not necessary if the ClickHouse server + certificate is trusted by the operating system. To trust the maintained list of "global" public root + certificates maintained by the Python 'certifi' package, set ca_cert to 'certifi' + :param client_cert: File path to a TLS Client certificate in .pem format. This file should contain any + applicable intermediate certificates + :param client_cert_key: File path to the private key for the Client Certificate. Required if the private key + is not included the Client Certificate key file + :param session_id ClickHouse session id. If not specified and the common setting 'autogenerate_session_id' + is True, the client will generate a UUID1 session id + :param pool_mgr Optional urllib3 PoolManager for this client. Useful for creating separate connection + pools for multiple client endpoints for applications with many clients + :param http_proxy http proxy address. Equivalent to setting the HTTP_PROXY environment variable + :param https_proxy https proxy address. Equivalent to setting the HTTPS_PROXY environment variable + :param server_host_name This is the server host name that will be checked against a TLS certificate for + validity. This option can be used if using an ssh_tunnel or other indirect means to an ClickHouse server + where the `host` argument refers to the tunnel or proxy and not the actual ClickHouse server + :return: ClickHouse Connect Client instance + """ + if dsn: + parsed = urlparse(dsn) + username = username or parsed.username + password = password or parsed.password + host = host or parsed.hostname + port = port or parsed.port + if parsed.path and (not database or database == '__default__'): + database = parsed.path[1:].split('/')[0] + database = database or parsed.path + kwargs.update(dict(parse_qs(parsed.query))) + use_tls = str(secure).lower() == 'true' or interface == 'https' or (not interface and port in (443, 8443)) + if not host: + host = 'localhost' + if not interface: + interface = 'https' if use_tls else 'http' + port = port or default_port(interface, use_tls) + if username is None and 'user' in kwargs: + username = kwargs.pop('user') + if username is None and 'user_name' in kwargs: + username = kwargs.pop('user_name') + if password and username is None: + username = 'default' + if 'compression' in kwargs and 'compress' not in kwargs: + kwargs['compress'] = kwargs.pop('compression') + settings = settings or {} + if interface.startswith('http'): + if generic_args: + client_params = signature(HttpClient).parameters + for name, value in generic_args.items(): + if name in client_params: + kwargs[name] = value + elif name == 'compression': + if 'compress' not in kwargs: + kwargs['compress'] = value + else: + if name.startswith('ch_'): + name = name[3:] + settings[name] = value + return HttpClient(interface, host, port, username, password, database, settings=settings, **kwargs) + raise ProgrammingError(f'Unrecognized client type {interface}') + + +def default_port(interface: str, secure: bool): + if interface.startswith('http'): + return 8443 if secure else 8123 + raise ValueError('Unrecognized ClickHouse interface') diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/buffer.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/buffer.py new file mode 100644 index 0000000000..b50b9bb678 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/buffer.py @@ -0,0 +1,140 @@ +import sys +import array +from typing import Any, Iterable + +from clickhouse_connect.driver.exceptions import StreamCompleteException +from clickhouse_connect.driver.types import ByteSource + +must_swap = sys.byteorder == 'big' + + +class ResponseBuffer(ByteSource): + slots = 'slice_sz', 'buf_loc', 'end', 'gen', 'buffer', 'slice' + + def __init__(self, source): + self.slice_sz = 4096 + self.buf_loc = 0 + self.buf_sz = 0 + self.source = source + self.gen = source.gen + self.buffer = bytes() + + def read_bytes(self, sz: int): + if self.buf_loc + sz <= self.buf_sz: + self.buf_loc += sz + return self.buffer[self.buf_loc - sz: self.buf_loc] + # Create a temporary buffer that bridges two or more source chunks + bridge = bytearray(self.buffer[self.buf_loc: self.buf_sz]) + self.buf_loc = 0 + self.buf_sz = 0 + while len(bridge) < sz: + chunk = next(self.gen, None) + if not chunk: + raise StreamCompleteException + x = len(chunk) + if len(bridge) + x <= sz: + bridge.extend(chunk) + else: + tail = sz - len(bridge) + bridge.extend(chunk[:tail]) + self.buffer = chunk + self.buf_sz = x + self.buf_loc = tail + return bridge + + def read_byte(self) -> int: + if self.buf_loc < self.buf_sz: + self.buf_loc += 1 + return self.buffer[self.buf_loc - 1] + self.buf_sz = 0 + self.buf_loc = 0 + chunk = next(self.gen, None) + if not chunk: + raise StreamCompleteException + x = len(chunk) + if x > 1: + self.buffer = chunk + self.buf_loc = 1 + self.buf_sz = x + return chunk[0] + + def read_leb128(self) -> int: + sz = 0 + shift = 0 + while True: + b = self.read_byte() + sz += ((b & 0x7f) << shift) + if (b & 0x80) == 0: + return sz + shift += 7 + + def read_leb128_str(self) -> str: + sz = self.read_leb128() + return self.read_bytes(sz).decode() + + def read_uint64(self) -> int: + return int.from_bytes(self.read_bytes(8), 'little', signed=False) + + def read_str_col(self, + num_rows: int, + encoding: str, + nullable: bool = False, + null_obj: Any = None) -> Iterable[str]: + column = [] + app = column.append + null_map = self.read_bytes(num_rows) if nullable else None + for ix in range(num_rows): + sz = 0 + shift = 0 + while True: + b = self.read_byte() + sz += ((b & 0x7f) << shift) + if (b & 0x80) == 0: + break + shift += 7 + x = self.read_bytes(sz) + if null_map and null_map[ix]: + app(null_obj) + elif encoding: + try: + app(x.decode(encoding)) + except UnicodeDecodeError: + app(x.hex()) + else: + app(x) + return column + + def read_bytes_col(self, sz: int, num_rows: int) -> Iterable[bytes]: + source = self.read_bytes(sz * num_rows) + return [bytes(source[x:x+sz]) for x in range(0, sz * num_rows, sz)] + + def read_fixed_str_col(self, sz: int, num_rows: int, encoding: str) -> Iterable[str]: + source = self.read_bytes(sz * num_rows) + column = [] + app = column.append + for ix in range(0, sz * num_rows, sz): + try: + app(str(source[ix: ix + sz], encoding).rstrip('\x00')) + except UnicodeDecodeError: + app(source[ix: ix + sz].hex()) + return column + + def read_array(self, array_type: str, num_rows: int) -> Iterable[Any]: + column = array.array(array_type) + sz = column.itemsize * num_rows + b = self.read_bytes(sz) + column.frombytes(b) + if must_swap: + column.byteswap() + return column + + @property + def last_message(self): + if len(self.buffer) == 0: + return None + return self.buffer.decode() + + def close(self): + if self.source: + self.source.close() + self.source = None diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py new file mode 100644 index 0000000000..beaea8e0d1 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py @@ -0,0 +1,727 @@ +import io +import logging +from datetime import tzinfo, datetime + +import pytz + +from abc import ABC, abstractmethod +from typing import Iterable, Optional, Any, Union, Sequence, Dict, Generator, BinaryIO +from pytz.exceptions import UnknownTimeZoneError + +from clickhouse_connect import common +from clickhouse_connect.common import version +from clickhouse_connect.datatypes.registry import get_from_name +from clickhouse_connect.datatypes.base import ClickHouseType +from clickhouse_connect.driver.common import dict_copy, StreamContext, coerce_int, coerce_bool +from clickhouse_connect.driver.constants import CH_VERSION_WITH_PROTOCOL, PROTOCOL_VERSION_WITH_LOW_CARD +from clickhouse_connect.driver.exceptions import ProgrammingError, OperationalError +from clickhouse_connect.driver.external import ExternalData +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.summary import QuerySummary +from clickhouse_connect.driver.models import ColumnDef, SettingDef, SettingStatus +from clickhouse_connect.driver.query import QueryResult, to_arrow, QueryContext, arrow_buffer + +io.DEFAULT_BUFFER_SIZE = 1024 * 256 +logger = logging.getLogger(__name__) +arrow_str_setting = 'output_format_arrow_string_as_string' + + +# pylint: disable=too-many-public-methods, too-many-instance-attributes +class Client(ABC): + """ + Base ClickHouse Connect client + """ + compression: str = None + write_compression: str = None + protocol_version = 0 + valid_transport_settings = set() + optional_transport_settings = set() + database = None + max_error_message = 0 + + def __init__(self, + database: str, + query_limit: int, + uri: str, + query_retries: int, + server_host_name: Optional[str], + apply_server_timezone: Optional[Union[str, bool]]): + """ + Shared initialization of ClickHouse Connect client + :param database: database name + :param query_limit: default LIMIT for queries + :param uri: uri for error messages + """ + self.query_limit = coerce_int(query_limit) + self.query_retries = coerce_int(query_retries) + self.server_host_name = server_host_name + self.server_tz = pytz.UTC + self.server_version, server_tz = \ + tuple(self.command('SELECT version(), timezone()', use_database=False)) + try: + self.server_tz = pytz.timezone(server_tz) + except UnknownTimeZoneError: + logger.warning('Warning, server is using an unrecognized timezone %s, will use UTC default', server_tz) + offsets_differ = datetime.now().astimezone().utcoffset() != datetime.now(tz=self.server_tz).utcoffset() + self.apply_server_timezone = apply_server_timezone == 'always' or ( + coerce_bool(apply_server_timezone) and offsets_differ) + readonly = 'readonly' + if not self.min_version('19.17'): + readonly = common.get_setting('readonly') + server_settings = self.query(f'SELECT name, value, {readonly} as readonly FROM system.settings LIMIT 10000') + self.server_settings = {row['name']: SettingDef(**row) for row in server_settings.named_results()} + if database and not database == '__default__': + self.database = database + if self.min_version(CH_VERSION_WITH_PROTOCOL): + # Unfortunately we have to validate that the client protocol version is actually used by ClickHouse + # since the query parameter could be stripped off (in particular, by CHProxy) + test_data = self.raw_query('SELECT 1 AS check', fmt='Native', settings={ + 'client_protocol_version': PROTOCOL_VERSION_WITH_LOW_CARD + }) + if test_data[8:16] == b'\x01\x01\x05check': + self.protocol_version = PROTOCOL_VERSION_WITH_LOW_CARD + self.uri = uri + + def _validate_settings(self, settings: Optional[Dict[str, Any]]) -> Dict[str, str]: + """ + This strips any ClickHouse settings that are not recognized or are read only. + :param settings: Dictionary of setting name and values + :return: A filtered dictionary of settings with values rendered as strings + """ + validated = {} + invalid_action = common.get_setting('invalid_setting_action') + for key, value in settings.items(): + str_value = self._validate_setting(key, value, invalid_action) + if str_value is not None: + validated[key] = value + return validated + + def _validate_setting(self, key: str, value: Any, invalid_action: str) -> Optional[str]: + if key not in self.valid_transport_settings: + setting_def = self.server_settings.get(key) + if setting_def is None or setting_def.readonly: + if key in self.optional_transport_settings: + return None + if invalid_action == 'send': + logger.warning('Attempting to send unrecognized or readonly setting %s', key) + elif invalid_action == 'drop': + logger.warning('Dropping unrecognized or readonly settings %s', key) + return None + else: + raise ProgrammingError(f'Setting {key} is unknown or readonly') from None + if isinstance(value, bool): + return '1' if value else '0' + return str(value) + + def _setting_status(self, key: str) -> SettingStatus: + comp_setting = self.server_settings.get(key) + if not comp_setting: + return SettingStatus(False, False) + return SettingStatus(comp_setting.value != '0', comp_setting.readonly != 1) + + def _prep_query(self, context: QueryContext): + if context.is_select and not context.has_limit and self.query_limit: + return f'{context.final_query}\n LIMIT {self.query_limit}' + return context.final_query + + def _check_tz_change(self, new_tz) -> Optional[tzinfo]: + if new_tz: + try: + new_tzinfo = pytz.timezone(new_tz) + if new_tzinfo != self.server_tz: + return new_tzinfo + except UnknownTimeZoneError: + logger.warning('Unrecognized timezone %s received from ClickHouse', new_tz) + return None + + @abstractmethod + def _query_with_context(self, context: QueryContext): + pass + + @abstractmethod + def set_client_setting(self, key, value): + """ + Set a clickhouse setting for the client after initialization. If a setting is not recognized by ClickHouse, + or the setting is identified as "read_only", this call will either throw a Programming exception or attempt + to send the setting anyway based on the common setting 'invalid_setting_action' + :param key: ClickHouse setting name + :param value: ClickHouse setting value + """ + + @abstractmethod + def get_client_setting(self, key) -> Optional[str]: + """ + :param key: The setting key + :return: The string value of the setting, if it exists, or None + """ + + # pylint: disable=too-many-arguments,unused-argument,too-many-locals + def query(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + column_oriented: Optional[bool] = None, + use_numpy: Optional[bool] = None, + max_str_len: Optional[int] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> QueryResult: + """ + Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. For + parameters, see the create_query_context method + :return: QueryResult -- data and metadata from response + """ + if query and query.lower().strip().startswith('select __connect_version__'): + return QueryResult([[f'ClickHouse Connect v.{version()} ⓒ ClickHouse Inc.']], None, + ('connect_version',), (get_from_name('String'),)) + kwargs = locals().copy() + del kwargs['self'] + query_context = self.create_query_context(**kwargs) + if query_context.is_command: + response = self.command(query, + parameters=query_context.parameters, + settings=query_context.settings, + external_data=query_context.external_data) + if isinstance(response, QuerySummary): + return response.as_query_result() + return QueryResult([response] if isinstance(response, list) else [[response]]) + return self._query_with_context(query_context) + + def query_column_block_stream(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Variation of main query method that returns a stream of column oriented blocks. For + parameters, see the create_query_context method. + :return: StreamContext -- Iterable stream context that returns column oriented blocks + """ + return self._context_query(locals(), use_numpy=False, streaming=True).column_block_stream + + def query_row_block_stream(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Variation of main query method that returns a stream of row oriented blocks. For + parameters, see the create_query_context method. + :return: StreamContext -- Iterable stream context that returns blocks of rows + """ + return self._context_query(locals(), use_numpy=False, streaming=True).row_block_stream + + def query_rows_stream(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Variation of main query method that returns a stream of row oriented blocks. For + parameters, see the create_query_context method. + :return: StreamContext -- Iterable stream context that returns blocks of rows + """ + return self._context_query(locals(), use_numpy=False, streaming=True).rows_stream + + @abstractmethod + def raw_query(self, query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + fmt: str = None, + use_database: bool = True, + external_data: Optional[ExternalData] = None) -> bytes: + """ + Query method that simply returns the raw ClickHouse format bytes + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param fmt: ClickHouse output format + :param use_database Send the database parameter to ClickHouse so the command will be executed in the client + database context. + :param external_data External data to send with the query + :return: bytes representing raw ClickHouse return value based on format + """ + + # pylint: disable=duplicate-code,too-many-arguments,unused-argument + def query_np(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None): + """ + Query method that returns the results as a numpy array. For parameter values, see the + create_query_context method + :return: Numpy array representing the result set + """ + return self._context_query(locals(), use_numpy=True).np_result + + # pylint: disable=duplicate-code,too-many-arguments,unused-argument + def query_np_stream(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Query method that returns the results as a stream of numpy arrays. For parameter values, see the + create_query_context method + :return: Generator that yield a numpy array per block representing the result set + """ + return self._context_query(locals(), use_numpy=True, streaming=True).np_stream + + # pylint: disable=duplicate-code,too-many-arguments,unused-argument + def query_df(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + use_na_values: Optional[bool] = None, + query_tz: Optional[str] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None, + use_extended_dtypes: Optional[bool] = None): + """ + Query method that results the results as a pandas dataframe. For parameter values, see the + create_query_context method + :return: Pandas dataframe representing the result set + """ + return self._context_query(locals(), use_numpy=True, as_pandas=True).df_result + + # pylint: disable=duplicate-code,too-many-arguments,unused-argument + def query_df_stream(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + use_na_values: Optional[bool] = None, + query_tz: Optional[str] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None, + use_extended_dtypes: Optional[bool] = None) -> StreamContext: + """ + Query method that returns the results as a StreamContext. For parameter values, see the + create_query_context method + :return: Pandas dataframe representing the result set + """ + return self._context_query(locals(), use_numpy=True, + as_pandas=True, + streaming=True).df_stream + + def create_query_context(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + column_oriented: Optional[bool] = None, + use_numpy: Optional[bool] = False, + max_str_len: Optional[int] = 0, + context: Optional[QueryContext] = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + use_na_values: Optional[bool] = None, + streaming: bool = False, + as_pandas: bool = False, + external_data: Optional[ExternalData] = None, + use_extended_dtypes: Optional[bool] = None) -> QueryContext: + """ + Creates or updates a reusable QueryContext object + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param query_formats: See QueryContext __init__ docstring + :param column_formats: See QueryContext __init__ docstring + :param encoding: See QueryContext __init__ docstring + :param use_none: Use None for ClickHouse NULL instead of default values. Note that using None in Numpy + arrays will force the numpy array dtype to 'object', which is often inefficient. This effect also + will impact the performance of Pandas dataframes. + :param column_oriented: Deprecated. Controls orientation of the QueryResult result_set property + :param use_numpy: Return QueryResult columns as one-dimensional numpy arrays + :param max_str_len: Limit returned ClickHouse String values to this length, which allows a Numpy + structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for + String columns will always be object arrays + :param context: An existing QueryContext to be updated with any provided parameter values + :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects). + Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime + objects with the selected timezone. + :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to + tzinfo objects). The timezone will be applied to datetime objects returned in the query + :param use_na_values: Deprecated alias for use_advanced_dtypes + :param as_pandas Return the result columns as pandas.Series objects + :param streaming Marker used to correctly configure streaming queries + :param external_data ClickHouse "external data" to send with query + :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as + pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray + and StringArray. Defaulted to True for query_df methods + :return: Reusable QueryContext + """ + if context: + return context.updated_copy(query=query, + parameters=parameters, + settings=settings, + query_formats=query_formats, + column_formats=column_formats, + encoding=encoding, + server_tz=self.server_tz, + use_none=use_none, + column_oriented=column_oriented, + use_numpy=use_numpy, + max_str_len=max_str_len, + query_tz=query_tz, + column_tzs=column_tzs, + as_pandas=as_pandas, + use_extended_dtypes=use_extended_dtypes, + streaming=streaming, + external_data=external_data) + if use_numpy and max_str_len is None: + max_str_len = 0 + if use_extended_dtypes is None: + use_extended_dtypes = use_na_values + if as_pandas and use_extended_dtypes is None: + use_extended_dtypes = True + return QueryContext(query=query, + parameters=parameters, + settings=settings, + query_formats=query_formats, + column_formats=column_formats, + encoding=encoding, + server_tz=self.server_tz, + use_none=use_none, + column_oriented=column_oriented, + use_numpy=use_numpy, + max_str_len=max_str_len, + query_tz=query_tz, + column_tzs=column_tzs, + use_extended_dtypes=use_extended_dtypes, + as_pandas=as_pandas, + streaming=streaming, + apply_server_tz=self.apply_server_timezone, + external_data=external_data) + + def query_arrow(self, + query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + use_strings: Optional[bool] = None, + external_data: Optional[ExternalData] = None): + """ + Query method using the ClickHouse Arrow format to return a PyArrow table + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) + :param external_data ClickHouse "external data" to send with query + :return: PyArrow.Table + """ + settings = dict_copy(settings) + if self.database: + settings['database'] = self.database + str_status = self._setting_status(arrow_str_setting) + if use_strings is None: + if str_status.is_writable and not str_status.is_set: + settings[arrow_str_setting] = '1' # Default to returning strings if possible + elif use_strings != str_status.is_set: + if not str_status.is_writable: + raise OperationalError(f'Cannot change readonly {arrow_str_setting} to {use_strings}') + settings[arrow_str_setting] = '1' if use_strings else '0' + return to_arrow(self.raw_query(query, + parameters, + settings, + fmt='Arrow', + external_data=external_data)) + + @abstractmethod + def command(self, + cmd: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + data: Union[str, bytes] = None, + settings: Dict[str, Any] = None, + use_database: bool = True, + external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + """ + Client method that returns a single value instead of a result set + :param cmd: ClickHouse query/command as a python format string + :param parameters: Optional dictionary of key/values pairs to be formatted + :param data: Optional 'data' for the command (for INSERT INTO in particular) + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client + database context. Otherwise, no database will be specified with the command. This is useful for determining + the default user database + :param external_data ClickHouse "external data" to send with command/query + :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary + if no data returned + """ + + @abstractmethod + def ping(self) -> bool: + """ + Validate the connection, does not throw an Exception (see debug logs) + :return: ClickHouse server is up and reachable + """ + + # pylint: disable=too-many-arguments + def insert(self, + table: Optional[str] = None, + data: Sequence[Sequence[Any]] = None, + column_names: Union[str, Iterable[str]] = '*', + database: Optional[str] = None, + column_types: Sequence[ClickHouseType] = None, + column_type_names: Sequence[str] = None, + column_oriented: bool = False, + settings: Optional[Dict[str, Any]] = None, + context: InsertContext = None) -> QuerySummary: + """ + Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments + other than data are ignored + :param table: Target table + :param data: Sequence of sequences of Python data + :param column_names: Ordered list of column names or '*' if column types should be retrieved from the + ClickHouse table definition + :param database: Target database -- will use client default database if not specified. + :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from + the server + :param column_type_names: ClickHouse column type names. If set then column data does not need to be + retrieved from the server + :param column_oriented: If true the data is already "pivoted" in column form + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param context: Optional reusable insert context to allow repeated inserts into the same table with + different data batches + :return: QuerySummary with summary information, throws exception if insert fails + """ + if (context is None or context.empty) and data is None: + raise ProgrammingError('No data specified for insert') from None + if context is None: + context = self.create_insert_context(table, + column_names, + database, + column_types, + column_type_names, + column_oriented, + settings) + if data is not None: + if not context.empty: + raise ProgrammingError('Attempting to insert new data with non-empty insert context') from None + context.data = data + return self.data_insert(context) + + def insert_df(self, table: str = None, + df=None, + database: Optional[str] = None, + settings: Optional[Dict] = None, + column_names: Optional[Sequence[str]] = None, + column_types: Sequence[ClickHouseType] = None, + column_type_names: Sequence[str] = None, + context: InsertContext = None) -> QuerySummary: + """ + Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored + :param table: ClickHouse table + :param df: two-dimensional pandas dataframe + :param database: Optional ClickHouse database + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param column_names: An optional list of ClickHouse column names. If not set, the DataFrame column names + will be used + :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from + the server + :param column_type_names: ClickHouse column type names. If set then column data does not need to be + retrieved from the server + :param context: Optional reusable insert context to allow repeated inserts into the same table with + different data batches + :return: QuerySummary with summary information, throws exception if insert fails + """ + if context is None: + if column_names is None: + column_names = df.columns + elif len(column_names) != len(df.columns): + raise ProgrammingError('DataFrame column count does not match insert_columns') from None + return self.insert(table, + df, + column_names, + database, + column_types=column_types, + column_type_names=column_type_names, + settings=settings, context=context) + + def insert_arrow(self, table: str, + arrow_table, database: str = None, + settings: Optional[Dict] = None) -> QuerySummary: + """ + Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format + :param table: ClickHouse table + :param arrow_table: PyArrow Table object + :param database: Optional ClickHouse database + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :return: QuerySummary with summary information, throws exception if insert fails + """ + full_table = table if '.' in table or not database else f'{database}.{table}' + column_names, insert_block = arrow_buffer(arrow_table) + return self.raw_insert(full_table, column_names, insert_block, settings, 'Arrow') + + def create_insert_context(self, + table: str, + column_names: Optional[Union[str, Sequence[str]]] = None, + database: Optional[str] = None, + column_types: Sequence[ClickHouseType] = None, + column_type_names: Sequence[str] = None, + column_oriented: bool = False, + settings: Optional[Dict[str, Any]] = None, + data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext: + """ + Builds a reusable insert context to hold state for a duration of an insert + :param table: Target table + :param database: Target database. If not set, uses the client default database + :param column_names: Optional ordered list of column names. If not set, all columns ('*') will be assumed + in the order specified by the table definition + :param database: Target database -- will use client default database if not specified + :param column_types: ClickHouse column types. Optional Sequence of ClickHouseType objects. If neither column + types nor column type names are set, actual column types will be retrieved from the server. + :param column_type_names: ClickHouse column type names. Specified column types by name string + :param column_oriented: If true the data is already "pivoted" in column form + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param data: Initial dataset for insert + :return Reusable insert context + """ + full_table = table if '.' in table or not database else f'{database}.{table}' + column_defs = [] + if column_types is None and column_type_names is None: + describe_result = self.query(f'DESCRIBE TABLE {full_table}') + column_defs = [ColumnDef(**row) for row in describe_result.named_results() + if row['default_type'] not in ('ALIAS', 'MATERIALIZED')] + if column_names is None or isinstance(column_names, str) and column_names == '*': + column_names = [cd.name for cd in column_defs] + column_types = [cd.ch_type for cd in column_defs] + elif isinstance(column_names, str): + column_names = [column_names] + if len(column_names) == 0: + raise ValueError('Column names must be specified for insert') + if not column_types: + if column_type_names: + column_types = [get_from_name(name) for name in column_type_names] + else: + column_map = {d.name: d for d in column_defs} + try: + column_types = [column_map[name].ch_type for name in column_names] + except KeyError as ex: + raise ProgrammingError(f'Unrecognized column {ex} in table {table}') from None + if len(column_names) != len(column_types): + raise ProgrammingError('Column names do not match column types') from None + return InsertContext(full_table, + column_names, + column_types, + column_oriented=column_oriented, + settings=settings, + data=data) + + def min_version(self, version_str: str) -> bool: + """ + Determine whether the connected server is at least the submitted version + For Altinity Stable versions like 22.8.15.25.altinitystable + the last condition in the first list comprehension expression is added + :param version_str: A version string consisting of up to 4 integers delimited by dots + :return: True if version_str is greater than the server_version, False if less than + """ + try: + server_parts = [int(x) for x in self.server_version.split('.') if x.isnumeric()] + server_parts.extend([0] * (4 - len(server_parts))) + version_parts = [int(x) for x in version_str.split('.')] + version_parts.extend([0] * (4 - len(version_parts))) + except ValueError: + logger.warning('Server %s or requested version %s does not match format of numbers separated by dots', + self.server_version, version_str) + return False + for x, y in zip(server_parts, version_parts): + if x > y: + return True + if x < y: + return False + return True + + @abstractmethod + def data_insert(self, context: InsertContext) -> QuerySummary: + """ + Subclass implementation of the data insert + :context: InsertContext parameter object + :return: No return, throws an exception if the insert fails + """ + + @abstractmethod + def raw_insert(self, table: str, + column_names: Optional[Sequence[str]] = None, + insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, + settings: Optional[Dict] = None, + fmt: Optional[str] = None, + compression: Optional[str] = None) -> QuerySummary: + """ + Insert data already formatted in a bytes object + :param table: Table name (whether qualified with the database name or not) + :param column_names: Sequence of column names + :param insert_block: Binary or string data already in a recognized ClickHouse format + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param compression: Recognized ClickHouse `Accept-Encoding` header compression value + :param fmt: Valid clickhouse format + """ + + def close(self): + """ + Subclass implementation to close the connection to the server/deallocate the client + """ + + def _context_query(self, lcls: dict, **overrides): + kwargs = lcls.copy() + kwargs.pop('self') + kwargs.update(overrides) + return self._query_with_context((self.create_query_context(**kwargs))) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + self.close() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py new file mode 100644 index 0000000000..71adb00321 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py @@ -0,0 +1,206 @@ +import array +import struct +import sys + +from typing import Sequence, MutableSequence, Dict, Optional, Union, Generator + +from clickhouse_connect.driver.exceptions import ProgrammingError, StreamClosedError, DataError +from clickhouse_connect.driver.types import Closable + +# pylint: disable=invalid-name +must_swap = sys.byteorder == 'big' +int_size = array.array('i').itemsize +low_card_version = 1 + +array_map = {1: 'b', 2: 'h', 4: 'i', 8: 'q'} +decimal_prec = {32: 9, 64: 18, 128: 38, 256: 79} + +if int_size == 2: + array_map[4] = 'l' + +array_sizes = {v: k for k, v in array_map.items()} +array_sizes['f'] = 4 +array_sizes['d'] = 8 +np_date_types = {0: '[s]', 3: '[ms]', 6: '[us]', 9: '[ns]'} + + +def array_type(size: int, signed: bool): + """ + Determines the Python array.array code for the requested byte size + :param size: byte size + :param signed: whether int types should be signed or unsigned + :return: Python array.array code + """ + try: + code = array_map[size] + except KeyError: + return None + return code if signed else code.upper() + + +def write_array(code: str, column: Sequence, dest: MutableSequence): + """ + Write a column of native Python data matching the array.array code + :param code: Python array.array code matching the column data type + :param column: Column of native Python values + :param dest: Destination byte buffer + """ + if len(column) and not isinstance(column[0], (int, float)): + if code in ('f', 'F', 'd', 'D'): + column = [float(x) for x in column] + else: + column = [int(x) for x in column] + try: + buff = struct.Struct(f'<{len(column)}{code}') + dest += buff.pack(*column) + except (TypeError, OverflowError, struct.error) as ex: + raise DataError('Unable to create Python array. This is usually caused by trying to insert None ' + + 'values into a ClickHouse column that is not Nullable') from ex + + +def write_uint64(value: int, dest: MutableSequence): + """ + Write a single UInt64 value to a binary write buffer + :param value: UInt64 value to write + :param dest: Destination byte buffer + """ + dest.extend(value.to_bytes(8, 'little')) + + +def write_leb128(value: int, dest: MutableSequence): + """ + Write a LEB128 encoded integer to a target binary buffer + :param value: Integer value (positive only) + :param dest: Target buffer + """ + while True: + b = value & 0x7f + value >>= 7 + if value == 0: + dest.append(b) + return + dest.append(0x80 | b) + + +def decimal_size(prec: int): + """ + Determine the bit size of a ClickHouse or Python Decimal needed to store a value of the requested precision + :param prec: Precision of the Decimal in total number of base 10 digits + :return: Required bit size + """ + if prec < 1 or prec > 79: + raise ArithmeticError(f'Invalid precision {prec} for ClickHouse Decimal type') + if prec < 10: + return 32 + if prec < 19: + return 64 + if prec < 39: + return 128 + return 256 + + +def unescape_identifier(x: str) -> str: + if x.startswith('`') and x.endswith('`'): + return x[1:-1] + return x + + +def dict_copy(source: Dict = None, update: Optional[Dict] = None) -> Dict: + copy = source.copy() if source else {} + if update: + copy.update(update) + return copy + + +def empty_gen(): + yield from () + + +def coerce_int(val: Optional[Union[str, int]]) -> int: + if not val: + return 0 + return int(val) + + +def coerce_bool(val: Optional[Union[str, bool]]): + if not val: + return False + return val in (True, 'True', 'true', '1') + + +class SliceView(Sequence): + """ + Provides a view into a sequence rather than copying. Borrows liberally from + https://gist.github.com/mathieucaroff/0cf094325fb5294fb54c6a577f05a2c1 + Also see the discussion on SO: https://stackoverflow.com/questions/3485475/can-i-create-a-view-on-a-python-list + """ + slots = ('_source', '_range') + + def __init__(self, source: Sequence, source_slice: Optional[slice] = None): + if isinstance(source, SliceView): + self._source = source._source + self._range = source._range[source_slice] + else: + self._source = source + if source_slice is None: + self._range = range(len(source)) + else: + self._range = range(len(source))[source_slice] + + def __len__(self): + return len(self._range) + + def __getitem__(self, i): + if isinstance(i, slice): + return SliceView(self._source, i) + return self._source[self._range[i]] + + def __str__(self): + r = self._range + return str(self._source[slice(r.start, r.stop, r.step)]) + + def __repr__(self): + r = self._range + return f'SliceView({self._source[slice(r.start, r.stop, r.step)]})' + + def __eq__(self, other): + if self is other: + return True + if len(self) != len(other): + return False + for v, w in zip(self, other): + if v != w: + return False + return True + + +class StreamContext: + """ + Wraps a generator and its "source" in a Context. This ensures that the source will be "closed" even if the + generator is not fully consumed or there is an exception during consumption + """ + __slots__ = 'source', 'gen', '_in_context' + + def __init__(self, source: Closable, gen: Generator): + self.source = source + self.gen = gen + self._in_context = False + + def __iter__(self): + return self + + def __next__(self): + if not self._in_context: + raise ProgrammingError('Stream should be used within a context') + return next(self.gen) + + def __enter__(self): + if not self.gen: + raise StreamClosedError + self._in_context = True + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._in_context = False + self.source.close() + self.gen = None diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/compression.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/compression.py new file mode 100644 index 0000000000..db69ae3f04 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/compression.py @@ -0,0 +1,77 @@ +import zlib +from abc import abstractmethod +from typing import Union + +import lz4 +import lz4.frame +import zstandard + +try: + import brotli +except ImportError: + brotli = None + + +available_compression = ['lz4', 'zstd'] + +if brotli: + available_compression.append('br') +available_compression.extend(['gzip', 'deflate']) + +comp_map = {} + + +class Compressor: + def __init_subclass__(cls, tag: str, thread_safe: bool = True): + comp_map[tag] = cls() if thread_safe else cls + + @abstractmethod + def compress_block(self, block) -> Union[bytes, bytearray]: + return block + + def flush(self): + pass + + +class GzipCompressor(Compressor, tag='gzip', thread_safe=False): + def __init__(self, level: int = 6, wbits: int = 31): + self.zlib_obj = zlib.compressobj(level=level, wbits=wbits) + + def compress_block(self, block): + return self.zlib_obj.compress(block) + + def flush(self): + return self.zlib_obj.flush() + + +class Lz4Compressor(Compressor, tag='lz4', thread_safe=False): + def __init__(self): + self.comp = lz4.frame.LZ4FrameCompressor() + + def compress_block(self, block): + output = self.comp.begin(len(block)) + output += self.comp.compress(block) + return output + self.comp.flush() + + +class ZstdCompressor(Compressor, tag='zstd'): + def compress_block(self, block): + return zstandard.compress(block) + + +class BrotliCompressor(Compressor, tag='br'): + def compress_block(self, block): + return brotli.compress(block) + + +null_compressor = Compressor() + + +def get_compressor(compression: str) -> Compressor: + if not compression: + return null_compressor + comp = comp_map[compression] + try: + return comp() + except TypeError: + return comp diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/constants.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/constants.py new file mode 100644 index 0000000000..a242e559b9 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/constants.py @@ -0,0 +1,2 @@ +PROTOCOL_VERSION_WITH_LOW_CARD = 54405 +CH_VERSION_WITH_PROTOCOL = '23.2.1.2537' diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py new file mode 100644 index 0000000000..7984fbeebb --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py @@ -0,0 +1,72 @@ +import logging +import re +from datetime import datetime +from typing import Optional, Dict, Union, Any + +import pytz + +logger = logging.getLogger(__name__) + +_empty_map = {} + + +# pylint: disable=too-many-instance-attributes +class BaseQueryContext: + local_tz: pytz.timezone + + def __init__(self, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_extended_dtypes: bool = False, + use_numpy: bool = False): + self.settings = settings or {} + if query_formats is None: + self.type_formats = _empty_map + else: + self.type_formats = {re.compile(type_name.replace('*', '.*'), re.IGNORECASE): fmt + for type_name, fmt in query_formats.items()} + if column_formats is None: + self.col_simple_formats = _empty_map + self.col_type_formats = _empty_map + else: + self.col_simple_formats = {col_name: fmt for col_name, fmt in column_formats.items() if + isinstance(fmt, str)} + self.col_type_formats = {} + for col_name, fmt in column_formats.items(): + if not isinstance(fmt, str): + self.col_type_formats[col_name] = {re.compile(type_name.replace('*', '.*'), re.IGNORECASE): fmt + for type_name, fmt in fmt.items()} + self.query_formats = query_formats or {} + self.column_formats = column_formats or {} + self.encoding = encoding + self.use_numpy = use_numpy + self.use_extended_dtypes = use_extended_dtypes + self._active_col_fmt = None + self._active_col_type_fmts = _empty_map + + def start_column(self, name: str): + self._active_col_fmt = self.col_simple_formats.get(name) + self._active_col_type_fmts = self.col_type_formats.get(name, _empty_map) + + def active_fmt(self, ch_type): + if self._active_col_fmt: + return self._active_col_fmt + for type_pattern, fmt in self._active_col_type_fmts.items(): + if type_pattern.match(ch_type): + return fmt + for type_pattern, fmt in self.type_formats.items(): + if type_pattern.match(ch_type): + return fmt + return None + + +def _init_context_cls(): + local_tz = datetime.now().astimezone().tzinfo + if local_tz.tzname(datetime.now()) in ('UTC', 'GMT', 'Universal', 'GMT-0', 'Zulu', 'Greenwich'): + local_tz = pytz.UTC + BaseQueryContext.local_tz = local_tz + + +_init_context_cls() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py new file mode 100644 index 0000000000..e7bb607e68 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py @@ -0,0 +1,49 @@ +import logging +import os + +import clickhouse_connect.driver.dataconv as pydc +import clickhouse_connect.driver.npconv as pync +from clickhouse_connect.driver.buffer import ResponseBuffer +from clickhouse_connect.driver.common import coerce_bool + +logger = logging.getLogger(__name__) + +RespBuffCls = ResponseBuffer +data_conv = pydc +numpy_conv = pync + + +# pylint: disable=import-outside-toplevel,global-statement + +def connect_c_modules(): + if not coerce_bool(os.environ.get('CLICKHOUSE_CONNECT_USE_C', True)): + logger.info('ClickHouse Connect C optimizations disabled') + return + + global RespBuffCls, data_conv + try: + from clickhouse_connect.driverc.buffer import ResponseBuffer as CResponseBuffer + import clickhouse_connect.driverc.dataconv as cdc + + data_conv = cdc + RespBuffCls = CResponseBuffer + logger.debug('Successfully imported ClickHouse Connect C data optimizations') + connect_numpy() + except ImportError as ex: + logger.warning('Unable to connect optimized C data functions [%s], falling back to pure Python', + str(ex)) + + +def connect_numpy(): + global numpy_conv + try: + import clickhouse_connect.driverc.npconv as cnc + + numpy_conv = cnc + logger.debug('Successfully import ClickHouse Connect C/Numpy optimizations') + except ImportError as ex: + logger.debug('Unable to connect ClickHouse Connect C to Numpy API [%s], falling back to pure Python', + str(ex)) + + +connect_c_modules() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/dataconv.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/dataconv.py new file mode 100644 index 0000000000..29c96a9a66 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/dataconv.py @@ -0,0 +1,129 @@ +import array +from datetime import datetime, date, tzinfo +from ipaddress import IPv4Address +from typing import Sequence, Optional, Any +from uuid import UUID, SafeUUID + +from clickhouse_connect.driver.common import int_size +from clickhouse_connect.driver.types import ByteSource +from clickhouse_connect.driver.options import np + + +MONTH_DAYS = (0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334, 365) +MONTH_DAYS_LEAP = (0, 31, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335, 366) + + +def read_ipv4_col(source: ByteSource, num_rows: int): + column = source.read_array('I', num_rows) + fast_ip_v4 = IPv4Address.__new__ + new_col = [] + app = new_col.append + for x in column: + ipv4 = fast_ip_v4(IPv4Address) + ipv4._ip = x # pylint: disable=protected-access + app(ipv4) + return new_col + + +def read_datetime_col(source: ByteSource, num_rows: int, tz_info: Optional[tzinfo]): + src_array = source.read_array('I', num_rows) + if tz_info is None: + fts = datetime.utcfromtimestamp + return [fts(ts) for ts in src_array] + fts = datetime.fromtimestamp + return [fts(ts, tz_info) for ts in src_array] + + +def epoch_days_to_date(days: int) -> date: + cycles400, rem = divmod(days + 134774, 146097) + cycles100, rem = divmod(rem, 36524) + cycles, rem = divmod(rem, 1461) + years, rem = divmod(rem, 365) + year = (cycles << 2) + cycles400 * 400 + cycles100 * 100 + years + 1601 + if years == 4 or cycles100 == 4: + return date(year - 1, 12, 31) + m_list = MONTH_DAYS_LEAP if years == 3 and (year == 2000 or year % 100 != 0) else MONTH_DAYS + month = (rem + 24) >> 5 + while rem < m_list[month]: + month -= 1 + return date(year, month + 1, rem + 1 - m_list[month]) + + +def read_date_col(source: ByteSource, num_rows: int): + column = source.read_array('H', num_rows) + return [epoch_days_to_date(x) for x in column] + + +def read_date32_col(source: ByteSource, num_rows: int): + column = source.read_array('l' if int_size == 2 else 'i', num_rows) + return [epoch_days_to_date(x) for x in column] + + +def read_uuid_col(source: ByteSource, num_rows: int): + v = source.read_array('Q', num_rows * 2) + empty_uuid = UUID(int=0) + new_uuid = UUID.__new__ + unsafe = SafeUUID.unsafe + oset = object.__setattr__ + column = [] + app = column.append + for i in range(num_rows): + ix = i << 1 + int_value = v[ix] << 64 | v[ix + 1] + if int_value == 0: + app(empty_uuid) + else: + fast_uuid = new_uuid(UUID) + oset(fast_uuid, 'int', int_value) + oset(fast_uuid, 'is_safe', unsafe) + app(fast_uuid) + return column + + +def read_nullable_array(source: ByteSource, array_type: str, num_rows: int, null_obj: Any): + null_map = source.read_bytes(num_rows) + column = source.read_array(array_type, num_rows) + return [null_obj if null_map[ix] else column[ix] for ix in range(num_rows)] + + +def build_nullable_column(source: Sequence, null_map: bytes, null_obj: Any): + return [source[ix] if null_map[ix] == 0 else null_obj for ix in range(len(source))] + + +def build_lc_nullable_column(index: Sequence, keys: array.array, null_obj: Any): + column = [] + for key in keys: + if key == 0: + column.append(null_obj) + else: + column.append(index[key]) + return column + + +def to_numpy_array(column: Sequence): + arr = np.empty((len(column),), dtype=np.object) + arr[:] = column + return arr + + +def pivot(data: Sequence[Sequence], start_row: int, end_row: int) -> Sequence[Sequence]: + return tuple(zip(*data[start_row: end_row])) + + +def write_str_col(column: Sequence, encoding: Optional[str], dest: bytearray): + app = dest.append + for x in column: + if not x: + app(0) + else: + if encoding: + x = x.encode(encoding) + sz = len(x) + while True: + b = sz & 0x7f + sz >>= 7 + if sz == 0: + app(b) + break + app(0x80 | b) + dest += x diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/ddl.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ddl.py new file mode 100644 index 0000000000..a9a1a4b0aa --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ddl.py @@ -0,0 +1,28 @@ +from typing import NamedTuple, Sequence + +from clickhouse_connect.datatypes.base import ClickHouseType + + +class TableColumnDef(NamedTuple): + """ + Simplified ClickHouse Table Column definition for DDL + """ + name: str + ch_type: ClickHouseType + expr_type: str = None + expr: str = None + + @property + def col_expr(self): + expr = f'{self.name} {self.ch_type.name}' + if self.expr_type: + expr += f' {self.expr_type} {self.expr}' + return expr + + +def create_table(table_name: str, columns: Sequence[TableColumnDef], engine: str, engine_params: dict): + stmt = f"CREATE TABLE {table_name} ({', '.join(col.col_expr for col in columns)}) ENGINE {engine} " + if engine_params: + for key, value in engine_params.items(): + stmt += f' {key} {value}' + return stmt diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/exceptions.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/exceptions.py new file mode 100644 index 0000000000..1cd41f9933 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/exceptions.py @@ -0,0 +1,84 @@ +""" +The driver exception classes here include all named exceptions required by th DB API 2.0 specification. It's not clear +how useful that naming convention is, but the convention is used for potential improved compatibility with other +libraries. In most cases docstring are taken from the DBIApi 2.0 documentation +""" + + +class ClickHouseError(Exception): + """Exception related to operation with ClickHouse.""" + + +# pylint: disable=redefined-builtin +class Warning(Warning, ClickHouseError): + """Exception raised for important warnings like data truncations + while inserting, etc.""" + + +class Error(ClickHouseError): + """Exception that is the base class of all other error exceptions + (not Warning).""" + + +class InterfaceError(Error): + """Exception raised for errors that are related to the database + interface rather than the database itself.""" + + +class DatabaseError(Error): + """Exception raised for errors that are related to the + database.""" + + +class DataError(DatabaseError): + """Exception raised for errors that are due to problems with the + processed data like division by zero, numeric value out of range, + etc.""" + + +class OperationalError(DatabaseError): + """Exception raised for errors that are related to the database's + operation and not necessarily under the control of the programmer, + e.g. an unexpected disconnect occurs, the data source name is not + found, a transaction could not be processed, a memory allocation + error occurred during processing, etc.""" + + +class IntegrityError(DatabaseError): + """Exception raised when the relational integrity of the database + is affected, e.g. a foreign key check fails, duplicate key, + etc.""" + + +class InternalError(DatabaseError): + """Exception raised when the database encounters an internal + error, e.g. the cursor is not valid anymore, the transaction is + out of sync, etc.""" + + +class ProgrammingError(DatabaseError): + """Exception raised for programming errors, e.g. table not found + or already exists, syntax error in the SQL statement, wrong number + of parameters specified, etc.""" + + +class NotSupportedError(DatabaseError): + """Exception raised in case a method or database API was used + which is not supported by the database, e.g. requesting a + .rollback() on a connection that does not support transaction or + has transactions turned off.""" + + +class StreamClosedError(ProgrammingError): + """Exception raised when a stream operation is executed on a closed stream.""" + + def __init__(self): + super().__init__('Executing a streaming operation on a closed stream') + + +class StreamCompleteException(Exception): + """ Internal exception used to indicate the end of a ClickHouse query result stream.""" + + +class StreamFailureError(Exception): + """ Stream failed unexpectedly """ diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/external.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/external.py new file mode 100644 index 0000000000..2d34f71ba8 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/external.py @@ -0,0 +1,127 @@ +import logging +from typing import Optional, Sequence, Dict, Union +from pathlib import Path + +from clickhouse_connect.driver.exceptions import ProgrammingError + +logger = logging.getLogger(__name__) + + +class ExternalFile: + # pylint: disable=too-many-branches + def __init__(self, + file_path: Optional[str] = None, + file_name: Optional[str] = None, + data: Optional[bytes] = None, + fmt: Optional[str] = None, + types: Optional[Union[str, Sequence[str]]] = None, + structure: Optional[Union[str, Sequence[str]]] = None, + mime_type: Optional[str] = None): + if file_path: + if data: + raise ProgrammingError('Only data or file_path should be specified for external data, not both') + try: + with open(file_path, 'rb') as file: + self.data = file.read() + except OSError as ex: + raise ProgrammingError(f'Failed to open file {file_path} for external data') from ex + path_name = Path(file_path).name + path_base = path_name.rsplit('.', maxsplit=1)[0] + if not file_name: + self.name = path_base + self.file_name = path_name + else: + self.name = file_name.rsplit('.', maxsplit=1)[0] + self.file_name = file_name + if file_name != path_name and path_base != self.name: + logger.warning('External data name %s and file_path %s use different names', file_name, path_name) + elif data: + if not file_name: + raise ProgrammingError('Name is required for query external data') + self.data = data + self.name = file_name.rsplit('.', maxsplit=1)[0] + self.file_name = file_name + else: + raise ProgrammingError('Either data or file_path must be specified for external data') + if types: + if structure: + raise ProgrammingError('Only types or structure should be specified for external data, not both') + self.structure = None + if isinstance(types, str): + self.types = types + else: + self.types = ','.join(types) + elif structure: + self.types = None + if isinstance(structure, str): + self.structure = structure + else: + self.structure = ','.join(structure) + self.fmt = fmt + self.mime_type = mime_type or 'application/octet-stream' + + @property + def form_data(self) -> tuple: + return self.file_name, self.data, self.mime_type + + @property + def query_params(self) -> Dict[str, str]: + params = {} + for name, value in (('format', self.fmt), + ('structure', self.structure), + ('types', self.types)): + if value: + params[f'{self.name}_{name}'] = value + return params + + +class ExternalData: + def __init__(self, + file_path: Optional[str] = None, + file_name: Optional[str] = None, + data: Optional[bytes] = None, + fmt: Optional[str] = None, + types: Optional[Union[str, Sequence[str]]] = None, + structure: Optional[Union[str, Sequence[str]]] = None, + mime_type: Optional[str] = None): + self.files: list[ExternalFile] = [] + if file_path or data: + first_file = ExternalFile(file_path=file_path, + file_name=file_name, + data=data, + fmt=fmt, + types=types, + structure=structure, + mime_type=mime_type) + self.files.append(first_file) + + def add_file(self, + file_path: Optional[str] = None, + file_name: Optional[str] = None, + data: Optional[bytes] = None, + fmt: Optional[str] = None, + types: Optional[Union[str, Sequence[str]]] = None, + structure: Optional[Union[str, Sequence[str]]] = None, + mime_type: Optional[str] = None): + self.files.append(ExternalFile(file_path=file_path, + file_name=file_name, + data=data, + fmt=fmt, + types=types, + structure=structure, + mime_type=mime_type)) + + @property + def form_data(self) -> Dict[str, tuple]: + if not self.files: + raise ProgrammingError('No external files set for external data') + return {file.name: file.form_data for file in self.files} + + @property + def query_params(self) -> Dict[str, str]: + if not self.files: + raise ProgrammingError('No external files set for external data') + params = {} + for file in self.files: + params.update(file.query_params) + return params diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py new file mode 100644 index 0000000000..c8fa9e6116 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py @@ -0,0 +1,473 @@ +import json +import logging +import re +import uuid +from base64 import b64encode +from typing import Optional, Dict, Any, Sequence, Union, List, Callable, Generator, BinaryIO +from urllib.parse import urlencode + +from urllib3 import Timeout +from urllib3.exceptions import HTTPError +from urllib3.poolmanager import PoolManager +from urllib3.response import HTTPResponse + +from clickhouse_connect import common +from clickhouse_connect.datatypes import registry +from clickhouse_connect.datatypes.base import ClickHouseType +from clickhouse_connect.driver.ctypes import RespBuffCls +from clickhouse_connect.driver.client import Client +from clickhouse_connect.driver.common import dict_copy, coerce_bool, coerce_int +from clickhouse_connect.driver.compression import available_compression +from clickhouse_connect.driver.exceptions import DatabaseError, OperationalError, ProgrammingError +from clickhouse_connect.driver.external import ExternalData +from clickhouse_connect.driver.httputil import ResponseSource, get_pool_manager, get_response_data, \ + default_pool_manager, get_proxy_manager, all_managers, check_env_proxy, check_conn_reset +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.summary import QuerySummary +from clickhouse_connect.driver.query import QueryResult, QueryContext, quote_identifier, bind_query +from clickhouse_connect.driver.transform import NativeTransform + +logger = logging.getLogger(__name__) +columns_only_re = re.compile(r'LIMIT 0\s*$', re.IGNORECASE) + + +# pylint: disable=too-many-instance-attributes +class HttpClient(Client): + params = {} + valid_transport_settings = {'database', 'buffer_size', 'session_id', + 'compress', 'decompress', 'session_timeout', + 'session_check', 'query_id', 'quota_key', + 'wait_end_of_query', 'client_protocol_version'} + optional_transport_settings = {'send_progress_in_http_headers', + 'http_headers_progress_interval_ms', + 'enable_http_compression'} + _owns_pool_manager = False + + # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements,unused-argument + def __init__(self, + interface: str, + host: str, + port: int, + username: str, + password: str, + database: str, + compress: Union[bool, str] = True, + query_limit: int = 0, + query_retries: int = 2, + connect_timeout: int = 10, + send_receive_timeout: int = 300, + client_name: Optional[str] = None, + verify: bool = True, + ca_cert: Optional[str] = None, + client_cert: Optional[str] = None, + client_cert_key: Optional[str] = None, + session_id: Optional[str] = None, + settings: Optional[Dict[str, Any]] = None, + pool_mgr: Optional[PoolManager] = None, + http_proxy: Optional[str] = None, + https_proxy: Optional[str] = None, + server_host_name: Optional[str] = None, + apply_server_timezone: Optional[Union[str, bool]] = True): + """ + Create an HTTP ClickHouse Connect client + See clickhouse_connect.get_client for parameters + """ + self.url = f'{interface}://{host}:{port}' + self.headers = {} + ch_settings = settings or {} + self.http = pool_mgr + if interface == 'https': + if not https_proxy: + https_proxy = check_env_proxy('https', host, port) + if client_cert: + if not username: + raise ProgrammingError('username parameter is required for Mutual TLS authentication') + self.headers['X-ClickHouse-User'] = username + self.headers['X-ClickHouse-SSL-Certificate-Auth'] = 'on' + verify = coerce_bool(verify) + # pylint: disable=too-many-boolean-expressions + if not self.http and (server_host_name or ca_cert or client_cert or not verify or https_proxy): + options = { + 'ca_cert': ca_cert, + 'client_cert': client_cert, + 'verify': verify, + 'client_cert_key': client_cert_key + } + if server_host_name: + if verify: + options['assert_hostname'] = server_host_name + options['server_hostname'] = server_host_name + self.http = get_pool_manager(https_proxy=https_proxy, **options) + self._owns_pool_manager = True + if not self.http: + if not http_proxy: + http_proxy = check_env_proxy('http', host, port) + if http_proxy: + self.http = get_proxy_manager(host, http_proxy) + else: + self.http = default_pool_manager() + + if not client_cert and username: + self.headers['Authorization'] = 'Basic ' + b64encode(f'{username}:{password}'.encode()).decode() + self.headers['User-Agent'] = common.build_client_name(client_name) + self._read_format = self._write_format = 'Native' + self._transform = NativeTransform() + + connect_timeout, send_receive_timeout = coerce_int(connect_timeout), coerce_int(send_receive_timeout) + self.timeout = Timeout(connect=connect_timeout, read=send_receive_timeout) + self.http_retries = 1 + self._send_progress = None + self._send_comp_setting = False + self._progress_interval = None + self._active_session = None + + if session_id: + ch_settings['session_id'] = session_id + elif 'session_id' not in ch_settings and common.get_setting('autogenerate_session_id'): + ch_settings['session_id'] = str(uuid.uuid4()) + + if coerce_bool(compress): + compression = ','.join(available_compression) + self.write_compression = available_compression[0] + elif compress and compress not in ('False', 'false', '0'): + if compress not in available_compression: + raise ProgrammingError(f'Unsupported compression method {compress}') + compression = compress + self.write_compression = compress + else: + compression = None + + super().__init__(database=database, + uri=self.url, + query_limit=query_limit, + query_retries=query_retries, + server_host_name=server_host_name, + apply_server_timezone=apply_server_timezone) + self.params = self._validate_settings(ch_settings) + comp_setting = self._setting_status('enable_http_compression') + self._send_comp_setting = not comp_setting.is_set and comp_setting.is_writable + if comp_setting.is_set or comp_setting.is_writable: + self.compression = compression + send_setting = self._setting_status('send_progress_in_http_headers') + self._send_progress = not send_setting.is_set and send_setting.is_writable + if (send_setting.is_set or send_setting.is_writable) and \ + self._setting_status('http_headers_progress_interval_ms').is_writable: + self._progress_interval = str(min(120000, max(10000, (send_receive_timeout - 5) * 1000))) + + def set_client_setting(self, key, value): + str_value = self._validate_setting(key, value, common.get_setting('invalid_setting_action')) + if str_value is not None: + self.params[key] = str_value + + def get_client_setting(self, key) -> Optional[str]: + values = self.params.get(key) + return values[0] if values else None + + def _prep_query(self, context: QueryContext): + final_query = super()._prep_query(context) + if context.is_insert: + return final_query + return f'{final_query}\n FORMAT {self._write_format}' + + def _query_with_context(self, context: QueryContext) -> QueryResult: + headers = {} + params = {} + if self.database: + params['database'] = self.database + if self.protocol_version: + params['client_protocol_version'] = self.protocol_version + context.block_info = True + params.update(context.bind_params) + params.update(self._validate_settings(context.settings)) + if columns_only_re.search(context.uncommented_query): + response = self._raw_request(f'{context.final_query}\n FORMAT JSON', + params, headers, retries=self.query_retries) + json_result = json.loads(response.data) + # ClickHouse will respond with a JSON object of meta, data, and some other objects + # We just grab the column names and column types from the metadata sub object + names: List[str] = [] + types: List[ClickHouseType] = [] + for col in json_result['meta']: + names.append(col['name']) + types.append(registry.get_from_name(col['type'])) + return QueryResult([], None, tuple(names), tuple(types)) + + if self.compression: + headers['Accept-Encoding'] = self.compression + if self._send_comp_setting: + params['enable_http_compression'] = '1' + final_query = self._prep_query(context) + if context.external_data: + body = bytes() + params['query'] = final_query + params.update(context.external_data.query_params) + fields = context.external_data.form_data + else: + body = final_query + fields = None + headers['Content-Type'] = 'text/plain; charset=utf-8' + response = self._raw_request(body, + params, + headers, + stream=True, + retries=self.query_retries, + fields=fields, + server_wait=not context.streaming) + byte_source = RespBuffCls(ResponseSource(response)) # pylint: disable=not-callable + context.set_response_tz(self._check_tz_change(response.headers.get('X-ClickHouse-Timezone'))) + query_result = self._transform.parse_response(byte_source, context) + query_result.summary = self._summary(response) + return query_result + + def data_insert(self, context: InsertContext) -> QuerySummary: + """ + See BaseClient doc_string for this method + """ + if context.empty: + logger.debug('No data included in insert, skipping') + return QuerySummary() + + def error_handler(resp: HTTPResponse): + # If we actually had a local exception when building the insert, throw that instead + if context.insert_exception: + ex = context.insert_exception + context.insert_exception = None + raise ex + self._error_handler(resp) + + headers = {'Content-Type': 'application/octet-stream'} + if context.compression is None: + context.compression = self.write_compression + if context.compression: + headers['Content-Encoding'] = context.compression + block_gen = self._transform.build_insert(context) + + params = {} + if self.database: + params['database'] = self.database + params.update(self._validate_settings(context.settings)) + + response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False) + logger.debug('Context insert response code: %d, content: %s', response.status, response.data) + context.data = None + return QuerySummary(self._summary(response)) + + def raw_insert(self, table: str = None, + column_names: Optional[Sequence[str]] = None, + insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, + settings: Optional[Dict] = None, + fmt: Optional[str] = None, + compression: Optional[str] = None) -> QuerySummary: + """ + See BaseClient doc_string for this method + """ + params = {} + headers = {'Content-Type': 'application/octet-stream'} + if compression: + headers['Content-Encoding'] = compression + if table: + cols = f" ({', '.join([quote_identifier(x) for x in column_names])})" if column_names is not None else '' + query = f'INSERT INTO {table}{cols} FORMAT {fmt if fmt else self._write_format}' + if not compression and isinstance(insert_block, str): + insert_block = query + '\n' + insert_block + elif not compression and isinstance(insert_block, (bytes, bytearray, BinaryIO)): + insert_block = (query + '\n').encode() + insert_block + else: + params['query'] = query + if self.database: + params['database'] = self.database + params.update(self._validate_settings(settings or {})) + response = self._raw_request(insert_block, params, headers, server_wait=False) + logger.debug('Raw insert response code: %d, content: %s', response.status, response.data) + return QuerySummary(self._summary(response)) + + @staticmethod + def _summary(response: HTTPResponse): + summary = {} + if 'X-ClickHouse-Summary' in response.headers: + try: + summary = json.loads(response.headers['X-ClickHouse-Summary']) + except json.JSONDecodeError: + pass + summary['query_id'] = response.headers.get('X-ClickHouse-Query-Id', '') + return summary + + def command(self, + cmd, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + data: Union[str, bytes] = None, + settings: Optional[Dict] = None, + use_database: int = True, + external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + """ + See BaseClient doc_string for this method + """ + cmd, params = bind_query(cmd, parameters, self.server_tz) + headers = {} + payload = None + fields = None + if external_data: + if data: + raise ProgrammingError('Cannot combine command data with external data') from None + fields = external_data.form_data + params.update(external_data.query_params) + elif isinstance(data, str): + headers['Content-Type'] = 'text/plain; charset=utf-8' + payload = data.encode() + elif isinstance(data, bytes): + headers['Content-Type'] = 'application/octet-stream' + payload = data + if payload is None and not cmd: + raise ProgrammingError('Command sent without query or recognized data') from None + if payload or fields: + params['query'] = cmd + else: + payload = cmd + if use_database and self.database: + params['database'] = self.database + params.update(self._validate_settings(settings or {})) + + method = 'POST' if payload or fields else 'GET' + response = self._raw_request(payload, params, headers, method, fields=fields) + if response.data: + try: + result = response.data.decode()[:-1].split('\t') + if len(result) == 1: + try: + return int(result[0]) + except ValueError: + return result[0] + return result + except UnicodeDecodeError: + return str(response.data) + return QuerySummary(self._summary(response)) + + def _error_handler(self, response: HTTPResponse, retried: bool = False) -> None: + err_str = f'HTTPDriver for {self.url} returned response code {response.status})' + try: + err_content = get_response_data(response) + except Exception: # pylint: disable=broad-except + err_content = None + finally: + response.close() + + if err_content: + err_msg = common.format_error(err_content.decode(errors='backslashreplace')) + err_str = f':{err_str}\n {err_msg}' + raise OperationalError(err_str) if retried else DatabaseError(err_str) from None + + def _raw_request(self, + data, + params: Dict[str, str], + headers: Optional[Dict[str, Any]] = None, + method: str = 'POST', + retries: int = 0, + stream: bool = False, + server_wait: bool = True, + fields: Optional[Dict[str, tuple]] = None, + error_handler: Callable = None) -> HTTPResponse: + if isinstance(data, str): + data = data.encode() + headers = dict_copy(self.headers, headers) + attempts = 0 + if server_wait: + params['wait_end_of_query'] = '1' + # We can't actually read the progress headers, but we enable them so ClickHouse sends something + # to keep the connection alive when waiting for long-running queries and (2) to get summary information + # if not streaming + if self._send_progress: + params['send_progress_in_http_headers'] = '1' + if self._progress_interval: + params['http_headers_progress_interval_ms'] = self._progress_interval + final_params = dict_copy(self.params, params) + url = f'{self.url}?{urlencode(final_params)}' + kwargs = { + 'headers': headers, + 'timeout': self.timeout, + 'retries': self.http_retries, + 'preload_content': not stream + } + if self.server_host_name: + kwargs['assert_same_host'] = False + kwargs['headers'].update({'Host': self.server_host_name}) + if fields: + kwargs['fields'] = fields + else: + kwargs['body'] = data + check_conn_reset(self.http) + query_session = final_params.get('session_id') + while True: + attempts += 1 + if query_session: + if query_session == self._active_session: + raise ProgrammingError('Attempt to execute concurrent queries within the same session.' + + 'Please use a separate client instance per thread/process.') + # There is a race condition here when using multiprocessing -- in that case the server will + # throw an error instead, but in most cases this more helpful error will be thrown first + self._active_session = query_session + try: + response = self.http.request(method, url, **kwargs) + except HTTPError as ex: + if isinstance(ex.__context__, ConnectionResetError): + # The server closed the connection, probably because the Keep Alive has expired + # We should be safe to retry, as ClickHouse should not have processed anything on a connection + # that it killed. We also only retry this once, as multiple disconnects are unlikely to be + # related to the Keep Alive settings + if attempts == 1: + logger.debug('Retrying remotely closed connection') + continue + logger.warning('Unexpected Http Driver Exception') + raise OperationalError(f'Error {ex} executing HTTP request attempt {attempts} {self.url}') from ex + finally: + if query_session: + self._active_session = None # Make sure we always clear this + if 200 <= response.status < 300: + return response + if response.status in (429, 503, 504): + if attempts > retries: + self._error_handler(response, True) + logger.debug('Retrying requests with status code %d', response.status) + elif error_handler: + error_handler(response) + else: + self._error_handler(response) + + def ping(self): + """ + See BaseClient doc_string for this method + """ + try: + response = self.http.request('GET', f'{self.url}/ping', timeout=3) + return 200 <= response.status < 300 + except HTTPError: + logger.debug('ping failed', exc_info=True) + return False + + def raw_query(self, query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, fmt: str = None, + use_database: bool = True, external_data: Optional[ExternalData] = None) -> bytes: + """ + See BaseClient doc_string for this method + """ + final_query, bind_params = bind_query(query, parameters, self.server_tz) + if fmt: + final_query += f'\n FORMAT {fmt}' + params = self._validate_settings(settings or {}) + if use_database and self.database: + params['database'] = self.database + params.update(bind_params) + if external_data: + body = bytes() + params['query'] = final_query + params.update(external_data.query_params) + fields = external_data.form_data + else: + body = final_query + fields = None + return self._raw_request(body, params, fields=fields).data + + def close(self): + if self._owns_pool_manager: + self.http.clear() + all_managers.pop(self.http, None) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py new file mode 100644 index 0000000000..9bb8e26508 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py @@ -0,0 +1,226 @@ +import atexit +import http +import logging +import multiprocessing +import os +import sys +import socket +import time +from typing import Dict, Any, Optional + +import certifi +import lz4.frame +import urllib3 +import zstandard +from urllib3.poolmanager import PoolManager, ProxyManager +from urllib3.response import HTTPResponse + +from clickhouse_connect.driver.exceptions import ProgrammingError +from clickhouse_connect import common + +logger = logging.getLogger(__name__) + +# We disable this warning. Verify must explicitly set to false, so we assume the user knows what they're doing +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +# Increase this number just to be safe when ClickHouse is returning progress headers +http.client._MAXHEADERS = 10000 # pylint: disable=protected-access + +DEFAULT_KEEP_INTERVAL = 30 +DEFAULT_KEEP_COUNT = 3 +DEFAULT_KEEP_IDLE = 30 + +SOCKET_TCP = socket.IPPROTO_TCP + +core_socket_options = [ + (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), + (SOCKET_TCP, socket.TCP_NODELAY, 1), + (socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 256), + (socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 256) +] + +logging.getLogger('urllib3').setLevel(logging.WARNING) +_proxy_managers = {} +all_managers = {} + + +@atexit.register +def close_managers(): + for manager in all_managers: + manager.clear() + + +# pylint: disable=no-member,too-many-arguments,too-many-branches +def get_pool_manager_options(keep_interval: int = DEFAULT_KEEP_INTERVAL, + keep_count: int = DEFAULT_KEEP_COUNT, + keep_idle: int = DEFAULT_KEEP_IDLE, + ca_cert: str = None, + verify: bool = True, + client_cert: str = None, + client_cert_key: str = None, + **options) -> Dict[str, Any]: + socket_options = core_socket_options.copy() + if getattr(socket, 'TCP_KEEPINTVL', None) is not None: + socket_options.append((SOCKET_TCP, socket.TCP_KEEPINTVL, keep_interval)) + if getattr(socket, 'TCP_KEEPCNT', None) is not None: + socket_options.append((SOCKET_TCP, socket.TCP_KEEPCNT, keep_count)) + if getattr(socket, 'TCP_KEEPIDLE', None) is not None: + socket_options.append((SOCKET_TCP, socket.TCP_KEEPIDLE, keep_idle)) + if sys.platform == 'darwin': + socket_options.append((SOCKET_TCP, getattr(socket, 'TCP_KEEPALIVE', 0x10), keep_interval)) + options['maxsize'] = options.get('maxsize', 8) + options['retries'] = options.get('retries', 1) + if ca_cert == 'certifi': + ca_cert = certifi.where() + options['cert_reqs'] = 'CERT_REQUIRED' if verify else 'CERT_NONE' + if ca_cert: + options['ca_certs'] = ca_cert + if client_cert: + options['cert_file'] = client_cert + if client_cert_key: + options['key_file'] = client_cert_key + options['socket_options'] = socket_options + options['block'] = options.get('block', False) + return options + + +def get_pool_manager(keep_interval: int = DEFAULT_KEEP_INTERVAL, + keep_count: int = DEFAULT_KEEP_COUNT, + keep_idle: int = DEFAULT_KEEP_IDLE, + ca_cert: str = None, + verify: bool = True, + client_cert: str = None, + client_cert_key: str = None, + http_proxy: str = None, + https_proxy: str = None, + **options): + options = get_pool_manager_options(keep_interval, + keep_count, + keep_idle, + ca_cert, + verify, + client_cert, + client_cert_key, + **options) + if http_proxy: + if https_proxy: + raise ProgrammingError('Only one of http_proxy or https_proxy should be specified') + if not http_proxy.startswith('http'): + http_proxy = f'http://{http_proxy}' + manager = ProxyManager(http_proxy, **options) + elif https_proxy: + if not https_proxy.startswith('http'): + https_proxy = f'https://{https_proxy}' + manager = ProxyManager(https_proxy, **options) + else: + manager = PoolManager(**options) + all_managers[manager] = int(time.time()) + return manager + + +def check_conn_reset(manager: PoolManager): + reset_seconds = common.get_setting('max_connection_age') + if reset_seconds: + last_reset = all_managers.get(manager, 0) + now = int(time.time()) + if last_reset < now - reset_seconds: + logger.debug('connection reset') + manager.clear() + all_managers[manager] = now + + +def get_proxy_manager(host: str, http_proxy): + key = f'{host}__{http_proxy}' + if key in _proxy_managers: + return _proxy_managers[key] + proxy_manager = get_pool_manager(http_proxy=http_proxy) + _proxy_managers[key] = proxy_manager + return proxy_manager + + +def get_response_data(response: HTTPResponse) -> bytes: + encoding = response.headers.get('content-encoding', None) + if encoding == 'zstd': + try: + zstd_decom = zstandard.ZstdDecompressor() + return zstd_decom.stream_reader(response.data).read() + except zstandard.ZstdError: + pass + if encoding == 'lz4': + lz4_decom = lz4.frame.LZ4FrameDecompressor() + return lz4_decom.decompress(response.data, len(response.data)) + return response.data + + +def check_env_proxy(scheme: str, host: str, port: int) -> Optional[str]: + env_var = f'{scheme}_proxy'.lower() + proxy = os.environ.get(env_var) + if not proxy: + proxy = os.environ.get(env_var.upper()) + if not proxy: + return None + no_proxy = os.environ.get('no_proxy') + if not no_proxy: + no_proxy = os.environ.get('NO_PROXY') + if not no_proxy: + return proxy + if no_proxy == '*': + return None # Wildcard no proxy means don't actually proxy anything + host = host.lower() + for name in no_proxy.split(','): + name = name.strip() + if name: + name = name.lstrip('.').lower() + if name in (host, f'{host}:{port}'): + return None # Host or host/port matches + if host.endswith('.' + name): + return None # Domain matches + return proxy + + +_default_pool_manager = get_pool_manager() + + +def default_pool_manager(): + if multiprocessing.current_process().name == 'MainProcess': + return _default_pool_manager + # PoolManagers don't seem to be safe for some multiprocessing environments, always return a new one + return get_pool_manager() + + +class ResponseSource: + def __init__(self, response: HTTPResponse, chunk_size: int = 1024 * 1024): + self.response = response + compression = response.headers.get('content-encoding') + if compression == 'zstd': + zstd_decom = zstandard.ZstdDecompressor().decompressobj() + + def decompress(): + while True: + chunk = response.read(chunk_size, decode_content=False) + if not chunk: + break + yield zstd_decom.decompress(chunk) + + self.gen = decompress() + elif compression == 'lz4': + lz4_decom = lz4.frame.LZ4FrameDecompressor() + + def decompress(): + while lz4_decom.needs_input: + data = self.response.read(chunk_size, decode_content=False) + if lz4_decom.unused_data: + data = lz4_decom.unused_data + data + if not data: + return + chunk = lz4_decom.decompress(data) + if chunk: + yield chunk + + self.gen = decompress() + else: + self.gen = response.stream(amt=chunk_size, decode_content=True) + + def close(self): + self.response.drain_conn() + self.response.close() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py new file mode 100644 index 0000000000..c7861b3077 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py @@ -0,0 +1,199 @@ +import logging +from math import log +from typing import Iterable, Sequence, Optional, Any, Dict, NamedTuple, Generator, Union, TYPE_CHECKING + +from clickhouse_connect.driver.query import quote_identifier + +from clickhouse_connect.driver.ctypes import data_conv +from clickhouse_connect.driver.context import BaseQueryContext +from clickhouse_connect.driver.options import np, pd, pd_time_test +from clickhouse_connect.driver.exceptions import ProgrammingError + +if TYPE_CHECKING: + from clickhouse_connect.datatypes.base import ClickHouseType + +logger = logging.getLogger(__name__) +DEFAULT_BLOCK_BYTES = 1 << 21 # Try to generate blocks between 1MB and 2MB in raw size + + +class InsertBlock(NamedTuple): + prefix: bytes + column_count: int + row_count: int + column_names: Iterable[str] + column_types: Iterable['ClickHouseType'] + column_data: Iterable[Sequence[Any]] + + +# pylint: disable=too-many-instance-attributes +class InsertContext(BaseQueryContext): + """ + Reusable Argument/parameter object for inserts. + """ + + # pylint: disable=too-many-arguments + def __init__(self, + table: str, + column_names: Sequence[str], + column_types: Sequence['ClickHouseType'], + data: Any = None, + column_oriented: Optional[bool] = None, + settings: Optional[Dict[str, Any]] = None, + compression: Optional[Union[str, bool]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + block_size: Optional[int] = None): + super().__init__(settings, query_formats, column_formats) + self.table = table + self.column_names = column_names + self.column_types = column_types + self.column_oriented = False if column_oriented is None else column_oriented + self.compression = compression + self.req_block_size = block_size + self.block_row_count = DEFAULT_BLOCK_BYTES + self.data = data + self.insert_exception = None + + @property + def empty(self) -> bool: + return self._data is None + + @property + def data(self): + return self._raw_data + + @data.setter + def data(self, data: Any): + self._raw_data = data + self.current_block = 0 + self.current_row = 0 + self.row_count = 0 + self.column_count = 0 + self._data = None + if data is None or len(data) == 0: + return + if pd and isinstance(data, pd.DataFrame): + data = self._convert_pandas(data) + self.column_oriented = True + if np and isinstance(data, np.ndarray): + data = self._convert_numpy(data) + if self.column_oriented: + self._next_block_data = self._column_block_data + self._block_columns = data # [SliceView(column) for column in data] + self._block_rows = None + self.column_count = len(data) + self.row_count = len(data[0]) + else: + self._next_block_data = self._row_block_data + self._block_rows = data + self._block_columns = None + self.row_count = len(data) + self.column_count = len(data[0]) + if self.row_count and self.column_count: + if self.column_count != len(self.column_names): + raise ProgrammingError('Insert data column count does not match column names') + self._data = data + self.block_row_count = self._calc_block_size() + + def _calc_block_size(self) -> int: + if self.req_block_size: + return self.req_block_size + row_size = 0 + sample_size = min((log(self.row_count) + 1) * 2, 64) + sample_freq = max(1, int(self.row_count / sample_size)) + for i, d_type in enumerate(self.column_types): + if d_type.byte_size: + row_size += d_type.byte_size + continue + if self.column_oriented: + col_data = self._data[i] + if sample_freq == 1: + d_size = d_type.data_size(col_data) + else: + sample = [col_data[j] for j in range(0, self.row_count, sample_freq)] + d_size = d_type.data_size(sample) + else: + data = self._data + sample = [data[j][i] for j in range(0, self.row_count, sample_freq)] + d_size = d_type.data_size(sample) + row_size += d_size + return 1 << (21 - int(log(row_size, 2))) + + def next_block(self) -> Generator[InsertBlock, None, None]: + while True: + block_end = min(self.current_row + self.block_row_count, self.row_count) + row_count = block_end - self.current_row + if row_count <= 0: + return + if self.current_block == 0: + cols = f" ({', '.join([quote_identifier(x) for x in self.column_names])})" + prefix = f'INSERT INTO {self.table}{cols} FORMAT Native\n'.encode() + else: + prefix = bytes() + self.current_block += 1 + data = self._next_block_data(self.current_row, block_end) + yield InsertBlock(prefix, self.column_count, row_count, self.column_names, self.column_types, data) + self.current_row = block_end + + def _column_block_data(self, block_start, block_end): + if block_start == 0 and self.row_count <= block_end: + return self._block_columns # Optimization if we don't need to break up the block + return [col[block_start: block_end] for col in self._block_columns] + + def _row_block_data(self, block_start, block_end): + return data_conv.pivot(self._block_rows, block_start, block_end) + + def _convert_pandas(self, df): + data = [] + for df_col_name, col_name, ch_type in zip(df.columns, self.column_names, self.column_types): + df_col = df[df_col_name] + d_type = str(df_col.dtype) + if ch_type.python_type == int: + if 'float' in d_type: + df_col = df_col.round().astype(ch_type.base_type, copy=False) + else: + df_col = df_col.astype(ch_type.base_type, copy=False) + elif 'datetime' in ch_type.np_type and (pd_time_test(df_col) or 'datetime64[ns' in d_type): + div = ch_type.nano_divisor + data.append([None if pd.isnull(x) else x.value // div for x in df_col]) + self.column_formats[col_name] = 'int' + continue + if ch_type.nullable: + if d_type == 'object': + # This is ugly, but the multiple replaces seem required as a result of this bug: + # https://github.com/pandas-dev/pandas/issues/29024 + df_col = df_col.replace({pd.NaT: None}).replace({np.nan: None}) + elif 'Float' in ch_type.base_type: + # This seems to be the only way to convert any null looking things to nan + df_col = df_col.astype(ch_type.np_type) + else: + df_col = df_col.replace({np.nan: None}) + data.append(df_col.to_numpy(copy=False)) + return data + + def _convert_numpy(self, np_array): + if np_array.dtype.names is None: + if 'date' in str(np_array.dtype): + for col_name, col_type in zip(self.column_names, self.column_types): + if 'date' in col_type.np_type: + self.column_formats[col_name] = 'int' + return np_array.astype('int').tolist() + for col_type in self.column_types: + if col_type.byte_size == 0 or col_type.byte_size > np_array.dtype.itemsize: + return np_array.tolist() + return np_array + + if set(self.column_names).issubset(set(np_array.dtype.names)): + data = [np_array[col_name] for col_name in self.column_names] + else: + # Column names don't match, so we have to assume they are in order + data = [np_array[col_name] for col_name in np_array.dtype.names] + for ix, (col_name, col_type) in enumerate(zip(self.column_names, self.column_types)): + d_type = data[ix].dtype + if 'date' in str(d_type) and 'date' in col_type.np_type: + self.column_formats[col_name] = 'int' + data[ix] = data[ix].astype(int).tolist() + elif col_type.byte_size == 0 or col_type.byte_size > d_type.itemsize: + data[ix] = data[ix].tolist() + self.column_oriented = True + return data diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/models.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/models.py new file mode 100644 index 0000000000..054c7b686a --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/models.py @@ -0,0 +1,37 @@ +from typing import NamedTuple + +from clickhouse_connect.datatypes.registry import get_from_name + + +class ColumnDef(NamedTuple): + """ + ClickHouse column definition from DESCRIBE TABLE command + """ + name: str + type: str + default_type: str + default_expression: str + comment: str + codec_expression: str + ttl_expression: str + + @property + def ch_type(self): + return get_from_name(self.type) + + +class SettingDef(NamedTuple): + """ + ClickHouse setting definition from system.settings table + """ + name: str + value: str + readonly: int + + +class SettingStatus(NamedTuple): + """ + Get the setting "status" from a ClickHouse server setting + """ + is_set: bool + is_writable: bool diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/npconv.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/npconv.py new file mode 100644 index 0000000000..df99550d34 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/npconv.py @@ -0,0 +1,9 @@ +from clickhouse_connect.driver.options import np + +from clickhouse_connect.driver.types import ByteSource + + +def read_numpy_array(source: ByteSource, np_type: str, num_rows: int): + dtype = np.dtype(np_type) + buffer = source.read_bytes(dtype.itemsize * num_rows) + return np.frombuffer(buffer, dtype, num_rows) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/npquery.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/npquery.py new file mode 100644 index 0000000000..1c063e4082 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/npquery.py @@ -0,0 +1,132 @@ +import logging +from typing import Generator, Sequence, Tuple + +from clickhouse_connect.driver.common import empty_gen, StreamContext +from clickhouse_connect.driver.exceptions import StreamClosedError +from clickhouse_connect.driver.types import Closable +from clickhouse_connect.driver.options import np, pd + +logger = logging.getLogger(__name__) + + +# pylint: disable=too-many-instance-attributes +class NumpyResult(Closable): + def __init__(self, + block_gen: Generator[Sequence, None, None] = None, + column_names: Tuple = (), + column_types: Tuple = (), + d_types: Sequence = (), + source: Closable = None): + self.column_names = column_names + self.column_types = column_types + self.np_types = d_types + self.source = source + self.query_id = '' + self.summary = {} + self._block_gen = block_gen or empty_gen() + self._numpy_result = None + self._df_result = None + + def _np_stream(self) -> Generator: + if self._block_gen is None: + raise StreamClosedError + + block_gen = self._block_gen + self._block_gen = None + if not self.np_types: + return block_gen + + d_types = self.np_types + first_type = d_types[0] + if first_type != np.object_ and all(np.dtype(np_type) == first_type for np_type in d_types): + self.np_types = first_type + + def numpy_blocks(): + for block in block_gen: + yield np.array(block, first_type).transpose() + else: + if any(x == np.object_ for x in d_types): + self.np_types = [np.object_] * len(self.np_types) + self.np_types = np.dtype(list(zip(self.column_names, d_types))) + + def numpy_blocks(): + for block in block_gen: + np_array = np.empty(len(block[0]), dtype=self.np_types) + for col_name, data in zip(self.column_names, block): + np_array[col_name] = data + yield np_array + + return numpy_blocks() + + def _df_stream(self) -> Generator: + if self._block_gen is None: + raise StreamClosedError + block_gen = self._block_gen + + def pd_blocks(): + for block in block_gen: + yield pd.DataFrame(dict(zip(self.column_names, block))) + + self._block_gen = None + return pd_blocks() + + def close_numpy(self): + if not self._block_gen: + raise StreamClosedError + chunk_size = 4 + pieces = [] + blocks = [] + for block in self._np_stream(): + blocks.append(block) + if len(blocks) == chunk_size: + pieces.append(np.concatenate(blocks, dtype=self.np_types)) + chunk_size *= 2 + blocks = [] + pieces.extend(blocks) + if len(pieces) > 1: + self._numpy_result = np.concatenate(pieces, dtype=self.np_types) + elif len(pieces) == 1: + self._numpy_result = pieces[0] + else: + self._numpy_result = np.empty((0,)) + self.close() + return self + + def close_df(self): + pieces = list(self._df_stream()) + if len(pieces) > 1: + self._df_result = pd.concat(pieces, ignore_index=True) + elif len(pieces) == 1: + self._df_result = pieces[0] + else: + self._df_result = pd.DataFrame() + self.close() + return self + + @property + def np_result(self): + if self._numpy_result is None: + self.close_numpy() + return self._numpy_result + + @property + def df_result(self): + if self._df_result is None: + self.close_df() + return self._df_result + + @property + def np_stream(self) -> StreamContext: + return StreamContext(self, self._np_stream()) + + @property + def df_stream(self) -> StreamContext: + return StreamContext(self, self._df_stream()) + + def close(self): + if self._block_gen is not None: + self._block_gen.close() + self._block_gen = None + if self.source: + self.source.close() + self.source = None diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/options.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/options.py new file mode 100644 index 0000000000..4cec665c03 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/options.py @@ -0,0 +1,52 @@ +from clickhouse_connect.driver.exceptions import NotSupportedError + +pd_time_test = None +pd_extended_dtypes = False + +try: + import numpy as np +except ImportError: + np = None + +try: + import pandas as pd + pd_extended_dtypes = not pd.__version__.startswith('0') + try: + from pandas.core.dtypes.common import is_datetime64_dtype + from pandas.core.dtypes.common import is_timedelta64_dtype + + def combined_test(arr_or_dtype): + return is_datetime64_dtype(arr_or_dtype) or is_timedelta64_dtype(arr_or_dtype) + + pd_time_test = combined_test + except ImportError: + try: + from pandas.core.dtypes.common import is_datetime_or_timedelta_dtype + pd_time_test = is_datetime_or_timedelta_dtype + except ImportError as ex: + raise NotSupportedError('pandas version does not contain expected test for temporal types') from ex +except ImportError: + pd = None + +try: + import pyarrow as arrow +except ImportError: + arrow = None + + +def check_numpy(): + if np: + return np + raise NotSupportedError('Numpy package is not installed') + + +def check_pandas(): + if pd: + return pd + raise NotSupportedError('Pandas package is not installed') + + +def check_arrow(): + if arrow: + return arrow + raise NotSupportedError('PyArrow package is not installed') diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py new file mode 100644 index 0000000000..a158e7f999 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py @@ -0,0 +1,166 @@ +from typing import Union, Tuple + +from clickhouse_connect.driver.common import unescape_identifier + + +# pylint: disable=too-many-branches +def parse_callable(expr) -> Tuple[str, Tuple[Union[str, int], ...], str]: + """ + Parses a single level ClickHouse optionally 'callable' function/identifier. The identifier is returned as the + first value in the response tuple. If the expression is callable -- i.e. an identifier followed by 0 or more + arguments in parentheses, the second returned value is a tuple of the comma separated arguments. The third and + final tuple value is any text remaining after the initial expression for further parsing/processing. + + Examples: + "Tuple(String, Enum('one' = 1, 'two' = 2))" will return "Tuple", ("String", "Enum('one' = 1,'two' = 2)"), "" + "MergeTree() PARTITION BY key" will return "MergeTree", (), "PARTITION BY key" + + :param expr: ClickHouse DDL or Column Name expression + :return: Tuple of the identifier, a tuple of arguments, and remaining text + """ + expr = expr.strip() + pos = expr.find('(') + space = expr.find(' ') + if pos == -1 and space == -1: + return expr, (), '' + if space != -1 and (pos == -1 or space < pos): + return expr[:space], (), expr[space:].strip() + name = expr[:pos] + pos += 1 # Skip first paren + values = [] + value = '' + in_str = False + level = 0 + + def add_value(): + try: + values.append(int(value)) + except ValueError: + values.append(value) + + while True: + char = expr[pos] + pos += 1 + if in_str: + value += char + if char == "'": + in_str = False + elif char == '\\' and expr[pos] == "'" and expr[pos:pos + 4] != "' = " and expr[pos:pos + 2] != "')": + value += expr[pos] + pos += 1 + else: + if level == 0: + if char == ' ': + space = pos + temp_char = expr[space] + while temp_char == ' ': + space += 1 + temp_char = expr[space] + if not value or temp_char in "()',=><0": + char = temp_char + pos = space + 1 + if char == ',': + add_value() + value = '' + continue + if char == ')': + break + if char == "'" and (not value or 'Enum' in value): + in_str = True + elif char == '(': + level += 1 + elif char == ')' and level: + level -= 1 + value += char + if value != '': + add_value() + return name, tuple(values), expr[pos:].strip() + + +def parse_enum(expr) -> Tuple[Tuple[str], Tuple[int]]: + """ + Parse a ClickHouse enum definition expression of the form ('key1' = 1, 'key2' = 2) + :param expr: ClickHouse enum expression/arguments + :return: Parallel tuples of string enum keys and integer enum values + """ + keys = [] + values = [] + pos = expr.find('(') + 1 + in_key = False + key = [] + value = [] + while True: + char = expr[pos] + pos += 1 + if in_key: + if char == "'": + keys.append(''.join(key)) + key = [] + in_key = False + elif char == '\\' and expr[pos] == "'" and expr[pos:pos + 4] != "' = " and expr[pos:] != "')": + key.append(expr[pos]) + pos += 1 + else: + key.append(char) + elif char not in (' ', '='): + if char == ',': + values.append(int(''.join(value))) + value = [] + elif char == ')': + values.append(int(''.join(value))) + break + elif char == "'" and not value: + in_key = True + else: + value.append(char) + values, keys = zip(*sorted(zip(values, keys))) + return tuple(keys), tuple(values) + + +def parse_columns(expr: str): + """ + Parse a ClickHouse column list of the form (col1 String, col2 Array(Tuple(String, Int32))). This also handles + unnamed columns (such as Tuple definitions). Mixed named and unnamed columns are not currently supported. + :param expr: ClickHouse enum expression/arguments + :return: Parallel tuples of column types and column types (strings) + """ + names = [] + columns = [] + pos = 1 + named = False + level = 0 + label = '' + in_str = False + while True: + char = expr[pos] + pos += 1 + if in_str: + if "'" == char: + in_str = False + elif char == '\\' and expr[pos] == "'" and expr[pos:pos + 4] != "' = " and expr[pos:pos + 2] != "')": + label += expr[pos] + pos += 1 + else: + if level == 0: + if char == ' ': + if label and not named: + names.append(unescape_identifier(label)) + label = '' + named = True + char = '' + elif char == ',': + columns.append(label) + named = False + label = '' + continue + elif char == ')': + columns.append(label) + break + if char == "'" and (not label or 'Enum' in label): + in_str = True + elif char == '(': + level += 1 + elif char == ')': + level -= 1 + label += char + return tuple(names), tuple(columns) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py new file mode 100644 index 0000000000..0b5086ae11 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py @@ -0,0 +1,496 @@ +import ipaddress +import logging +import re +import uuid +import pytz + +from enum import Enum +from typing import Any, Tuple, Dict, Sequence, Optional, Union, Generator +from datetime import date, datetime, tzinfo + +from pytz.exceptions import UnknownTimeZoneError + +from clickhouse_connect import common +from clickhouse_connect.driver.common import dict_copy, empty_gen, StreamContext +from clickhouse_connect.driver.external import ExternalData +from clickhouse_connect.driver.types import Matrix, Closable +from clickhouse_connect.json_impl import any_to_json +from clickhouse_connect.driver.exceptions import StreamClosedError, ProgrammingError +from clickhouse_connect.driver.options import check_arrow, pd_extended_dtypes +from clickhouse_connect.driver.context import BaseQueryContext + +logger = logging.getLogger(__name__) +commands = 'CREATE|ALTER|SYSTEM|GRANT|REVOKE|CHECK|DETACH|DROP|DELETE|KILL|' + \ + 'OPTIMIZE|SET|RENAME|TRUNCATE|USE' + +limit_re = re.compile(r'\s+LIMIT($|\s)', re.IGNORECASE) +select_re = re.compile(r'(^|\s)SELECT\s', re.IGNORECASE) +insert_re = re.compile(r'(^|\s)INSERT\s*INTO', re.IGNORECASE) +command_re = re.compile(r'(^\s*)(' + commands + r')\s', re.IGNORECASE) +external_bind_re = re.compile(r'{.+:.+}') + + +# pylint: disable=too-many-instance-attributes +class QueryContext(BaseQueryContext): + """ + Argument/parameter object for queries. This context is used to set thread/query specific formats + """ + + # pylint: disable=duplicate-code,too-many-arguments,too-many-locals + def __init__(self, + query: str = '', + parameters: Optional[Dict[str, Any]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + server_tz: tzinfo = pytz.UTC, + use_none: Optional[bool] = None, + column_oriented: Optional[bool] = None, + use_numpy: Optional[bool] = None, + max_str_len: Optional[int] = 0, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + use_extended_dtypes: Optional[bool] = None, + as_pandas: bool = False, + streaming: bool = False, + apply_server_tz: bool = False, + external_data: Optional[ExternalData] = None): + """ + Initializes various configuration settings for the query context + + :param query: Query string with Python style format value replacements + :param parameters: Optional dictionary of substitution values + :param settings: Optional ClickHouse settings for the query + :param query_formats: Optional dictionary of query formats with the key of a ClickHouse type name + (with * wildcards) and a value of valid query formats for those types. + The value 'encoding' can be sent to change the expected encoding for this query, with a value of + the desired encoding such as `latin-1` + :param column_formats: Optional dictionary of column specific formats. The key is the column name, + The value is either the format for the data column (such as 'string' for a UUID column) or a + second level "format" dictionary of a ClickHouse type name and a value of query formats. This + secondary dictionary can be used for nested column types such as Tuples or Maps + :param encoding: Optional string encoding for this query, such as 'latin-1' + :param column_formats: Optional dictionary + :param use_none: Use a Python None for ClickHouse NULL values in nullable columns. Otherwise the default + value of the column (such as 0 for numbers) will be returned in the result_set + :param max_str_len Limit returned ClickHouse String values to this length, which allows a Numpy + structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for + String columns will always be object arrays + :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects). + Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime + objects with the selected timezone + :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to + tzinfo objects). The timezone will be applied to datetime objects returned in the query + """ + super().__init__(settings, + query_formats, + column_formats, + encoding, + use_extended_dtypes if use_extended_dtypes is not None else False, + use_numpy if use_numpy is not None else False) + self.query = query + self.parameters = parameters or {} + self.use_none = True if use_none is None else use_none + self.column_oriented = False if column_oriented is None else column_oriented + self.use_numpy = use_numpy + self.max_str_len = 0 if max_str_len is None else max_str_len + self.server_tz = server_tz + self.apply_server_tz = apply_server_tz + self.external_data = external_data + if isinstance(query_tz, str): + try: + query_tz = pytz.timezone(query_tz) + except UnknownTimeZoneError as ex: + raise ProgrammingError(f'query_tz {query_tz} is not recognized') from ex + self.query_tz = query_tz + if column_tzs is not None: + for col_name, timezone in column_tzs.items(): + if isinstance(timezone, str): + try: + timezone = pytz.timezone(timezone) + column_tzs[col_name] = timezone + except UnknownTimeZoneError as ex: + raise ProgrammingError(f'column_tz {timezone} is not recognized') from ex + self.column_tzs = column_tzs + self.column_tz = None + self.response_tz = None + self.block_info = False + self.as_pandas = as_pandas + self.use_pandas_na = as_pandas and pd_extended_dtypes + self.streaming = streaming + self._update_query() + + @property + def is_select(self) -> bool: + return select_re.search(self.uncommented_query) is not None + + @property + def has_limit(self) -> bool: + return limit_re.search(self.uncommented_query) is not None + + @property + def is_insert(self) -> bool: + return insert_re.search(self.uncommented_query) is not None + + @property + def is_command(self) -> bool: + return command_re.search(self.uncommented_query) is not None + + def set_parameters(self, parameters: Dict[str, Any]): + self.parameters = parameters + self._update_query() + + def set_parameter(self, key: str, value: Any): + if not self.parameters: + self.parameters = {} + self.parameters[key] = value + self._update_query() + + def set_response_tz(self, response_tz: tzinfo): + self.response_tz = response_tz + + def start_column(self, name: str): + super().start_column(name) + if self.column_tzs and name in self.column_tzs: + self.column_tz = self.column_tzs[name] + else: + self.column_tz = None + + def active_tz(self, datatype_tz: Optional[tzinfo]): + if self.column_tz: + active_tz = self.column_tz + elif datatype_tz: + active_tz = datatype_tz + elif self.query_tz: + active_tz = self.query_tz + elif self.response_tz: + active_tz = self.response_tz + elif self.apply_server_tz: + active_tz = self.server_tz + else: + active_tz = self.local_tz + # Special case where if everything is UTC, including the local timezone, we use naive timezones + # for performance reasons + if active_tz == pytz.UTC and active_tz.utcoffset(datetime.now()) == self.local_tz.utcoffset(datetime.now()): + return None + return active_tz + + def updated_copy(self, + query: Optional[str] = None, + parameters: Optional[Dict[str, Any]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + server_tz: Optional[tzinfo] = None, + use_none: Optional[bool] = None, + column_oriented: Optional[bool] = None, + use_numpy: Optional[bool] = None, + max_str_len: Optional[int] = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + use_extended_dtypes: Optional[bool] = None, + as_pandas: bool = False, + streaming: bool = False, + external_data: Optional[ExternalData] = None) -> 'QueryContext': + """ + Creates Query context copy with parameters overridden/updated as appropriate. + """ + return QueryContext(query or self.query, + dict_copy(self.parameters, parameters), + dict_copy(self.settings, settings), + dict_copy(self.query_formats, query_formats), + dict_copy(self.column_formats, column_formats), + encoding if encoding else self.encoding, + server_tz if server_tz else self.server_tz, + self.use_none if use_none is None else use_none, + self.column_oriented if column_oriented is None else column_oriented, + self.use_numpy if use_numpy is None else use_numpy, + self.max_str_len if max_str_len is None else max_str_len, + self.query_tz if query_tz is None else query_tz, + self.column_tzs if column_tzs is None else column_tzs, + self.use_extended_dtypes if use_extended_dtypes is None else use_extended_dtypes, + as_pandas, + streaming, + self.apply_server_tz, + self.external_data if external_data is None else external_data) + + def _update_query(self): + self.final_query, self.bind_params = bind_query(self.query, self.parameters, self.server_tz) + self.uncommented_query = remove_sql_comments(self.final_query) + + +class QueryResult(Closable): + """ + Wrapper class for query return values and metadata + """ + + # pylint: disable=too-many-arguments + def __init__(self, + result_set: Matrix = None, + block_gen: Generator[Matrix, None, None] = None, + column_names: Tuple = (), + column_types: Tuple = (), + column_oriented: bool = False, + source: Closable = None, + query_id: str = None, + summary: Dict[str, Any] = None): + self._result_rows = result_set + self._result_columns = None + self._block_gen = block_gen or empty_gen() + self._in_context = False + self._query_id = query_id + self.column_names = column_names + self.column_types = column_types + self.column_oriented = column_oriented + self.source = source + self.summary = {} if summary is None else summary + + @property + def result_set(self) -> Matrix: + if self.column_oriented: + return self.result_columns + return self.result_rows + + @property + def result_columns(self) -> Matrix: + if self._result_columns is None: + result = [[] for _ in range(len(self.column_names))] + with self.column_block_stream as stream: + for block in stream: + for base, added in zip(result, block): + base.extend(added) + self._result_columns = result + return self._result_columns + + @property + def result_rows(self) -> Matrix: + if self._result_rows is None: + result = [] + with self.row_block_stream as stream: + for block in stream: + result.extend(block) + self._result_rows = result + return self._result_rows + + @property + def query_id(self) -> str: + query_id = self.summary.get('query_id') + if query_id: + return query_id + return self._query_id + + def _column_block_stream(self): + if self._block_gen is None: + raise StreamClosedError + block_stream = self._block_gen + self._block_gen = None + return block_stream + + def _row_block_stream(self): + for block in self._column_block_stream(): + yield list(zip(*block)) + + @property + def column_block_stream(self) -> StreamContext: + return StreamContext(self, self._column_block_stream()) + + @property + def row_block_stream(self): + return StreamContext(self, self._row_block_stream()) + + @property + def rows_stream(self) -> StreamContext: + def stream(): + for block in self._row_block_stream(): + for row in block: + yield row + + return StreamContext(self, stream()) + + def named_results(self) -> Generator[dict, None, None]: + for row in zip(*self.result_set) if self.column_oriented else self.result_set: + yield dict(zip(self.column_names, row)) + + @property + def row_count(self) -> int: + if self.column_oriented: + return 0 if len(self.result_set) == 0 else len(self.result_set[0]) + return len(self.result_set) + + @property + def first_item(self): + if self.column_oriented: + return {name: col[0] for name, col in zip(self.column_names, self.result_set)} + return dict(zip(self.column_names, self.result_set[0])) + + @property + def first_row(self): + if self.column_oriented: + return [col[0] for col in self.result_set] + return self.result_set[0] + + def close(self): + if self.source: + self.source.close() + self.source = None + if self._block_gen is not None: + self._block_gen.close() + self._block_gen = None + + +BS = '\\' +must_escape = (BS, '\'') + + +def quote_identifier(identifier: str): + first_char = identifier[0] + if first_char in ('`', '"') and identifier[-1] == first_char: + # Identifier is already quoted, assume that it's valid + return identifier + return f'`{identifier}`' + + +def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], + server_tz: Optional[tzinfo] = None) -> str: + if not parameters: + return query + if hasattr(parameters, 'items'): + return query % {k: format_query_value(v, server_tz) for k, v in parameters.items()} + return query % tuple(format_query_value(v) for v in parameters) + + +def bind_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], + server_tz: Optional[tzinfo] = None) -> Tuple[str, Dict[str, str]]: + if not parameters: + return query, {} + if external_bind_re.search(query) is None: + return finalize_query(query, parameters, server_tz), {} + return query, {f'param_{k}': format_bind_value(v, server_tz) for k, v in parameters.items()} + + +def format_str(value: str): + return f"'{escape_str(value)}'" + + +def escape_str(value: str): + return ''.join(f'{BS}{c}' if c in must_escape else c for c in value) + + +# pylint: disable=too-many-return-statements +def format_query_value(value: Any, server_tz: tzinfo = pytz.UTC): + """ + Format Python values in a ClickHouse query + :param value: Python object + :param server_tz: Server timezone for adjusting datetime values + :return: Literal string for python value + """ + if value is None: + return 'NULL' + if isinstance(value, str): + return format_str(value) + if isinstance(value, datetime): + if value.tzinfo is None: + value = value.replace(tzinfo=server_tz) + return f"'{value.strftime('%Y-%m-%d %H:%M:%S')}'" + if isinstance(value, date): + return f"'{value.isoformat()}'" + if isinstance(value, list): + return f"[{', '.join(format_query_value(x, server_tz) for x in value)}]" + if isinstance(value, tuple): + return f"({', '.join(format_query_value(x, server_tz) for x in value)})" + if isinstance(value, dict): + if common.get_setting('dict_parameter_format') == 'json': + return format_str(any_to_json(value).decode()) + pairs = [format_query_value(k, server_tz) + ':' + format_query_value(v, server_tz) + for k, v in value.items()] + return f"{{{', '.join(pairs)}}}" + if isinstance(value, Enum): + return format_query_value(value.value, server_tz) + if isinstance(value, (uuid.UUID, ipaddress.IPv4Address, ipaddress.IPv6Address)): + return f"'{value}'" + return str(value) + + +# pylint: disable=too-many-branches +def format_bind_value(value: Any, server_tz: tzinfo = pytz.UTC, top_level: bool = True): + """ + Format Python values in a ClickHouse query + :param value: Python object + :param server_tz: Server timezone for adjusting datetime values + :param top_level: Flag for top level for nested structures + :return: Literal string for python value + """ + + def recurse(x): + return format_bind_value(x, server_tz, False) + + if value is None: + return '\\N' + if isinstance(value, str): + if top_level: + # At the top levels, strings must not be surrounded by quotes + return escape_str(value) + return format_str(value) + if isinstance(value, datetime): + if value.tzinfo is None: + value = value.replace(tzinfo=server_tz) + val = value.strftime('%Y-%m-%d %H:%M:%S') + if top_level: + return val + return f"'{val}'" + if isinstance(value, date): + if top_level: + return value.isoformat() + return f"'{value.isoformat()}'" + if isinstance(value, list): + return f"[{', '.join(recurse(x) for x in value)}]" + if isinstance(value, tuple): + return f"({', '.join(recurse(x) for x in value)})" + if isinstance(value, dict): + if common.get_setting('dict_parameter_format') == 'json': + return any_to_json(value).decode() + pairs = [recurse(k) + ':' + recurse(v) + for k, v in value.items()] + return f"{{{', '.join(pairs)}}}" + if isinstance(value, Enum): + return recurse(value.value) + return str(value) + + +comment_re = re.compile(r"(\".*?\"|\'.*?\')|(/\*.*?\*/|(--\s)[^\n]*$)", re.MULTILINE | re.DOTALL) + + +def remove_sql_comments(sql: str) -> str: + """ + Remove SQL comments. This is useful to determine the type of SQL query, such as SELECT or INSERT, but we + don't fully trust it to correctly ignore weird quoted strings, and other edge cases, so we always pass the + original SQL to ClickHouse (which uses a full-fledged AST/ token parser) + :param sql: SQL query + :return: SQL Query without SQL comments + """ + + def replacer(match): + # if the 2nd group (capturing comments) is not None, it means we have captured a + # non-quoted, actual comment string, so return nothing to remove the comment + if match.group(2): + return '' + # Otherwise we've actually captured a quoted string, so return it + return match.group(1) + + return comment_re.sub(replacer, sql) + + +def to_arrow(content: bytes): + pyarrow = check_arrow() + reader = pyarrow.ipc.RecordBatchFileReader(content) + return reader.read_all() + + +def arrow_buffer(table) -> Tuple[Sequence[str], bytes]: + pyarrow = check_arrow() + sink = pyarrow.BufferOutputStream() + with pyarrow.RecordBatchFileWriter(sink, table.schema) as writer: + writer.write(table) + return table.schema.names, sink.getvalue() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/summary.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/summary.py new file mode 100644 index 0000000000..ef152cad76 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/summary.py @@ -0,0 +1,39 @@ +from typing import Optional + +from clickhouse_connect.datatypes.registry import get_from_name + +from clickhouse_connect.driver.query import QueryResult + + +class QuerySummary: + summary = {} + + def __init__(self, summary: Optional[dict] = None): + if summary is not None: + self.summary = summary + + @property + def written_rows(self) -> int: + return int(self.summary.get('written_rows', 0)) + + def written_bytes(self) -> int: + return int(self.summary.get('written_bytes', 0)) + + def query_id(self) -> str: + return self.summary.get('query_id', '') + + def as_query_result(self) -> QueryResult: + data = [] + column_names = [] + column_types = [] + str_type = get_from_name('String') + int_type = get_from_name('Int64') + for key, value in self.summary.items(): + column_names.append(key) + if value.isnumeric(): + data.append(int(value)) + column_types.append(int_type) + else: + data.append(value) + column_types.append(str_type) + return QueryResult([data], column_names=tuple(column_names), column_types=tuple(column_types)) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py new file mode 100644 index 0000000000..420686cd64 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py @@ -0,0 +1,28 @@ +from typing import Optional, Sequence, Dict, Any + +from clickhouse_connect.driver import Client +from clickhouse_connect.driver.summary import QuerySummary +from clickhouse_connect.driver.query import quote_identifier + + +def insert_file(client: Client, + table: str, + file_path: str, + fmt: Optional[str] = None, + column_names: Optional[Sequence[str]] = None, + database: Optional[str] = None, + settings: Optional[Dict[str, Any]] = None, + compression: Optional[str] = None) -> QuerySummary: + full_table = f'{quote_identifier(database)}.{quote_identifier(table)}' if database else quote_identifier(table) + if not fmt: + fmt = 'CSV' if column_names else 'CSVWithNames' + if compression is None: + if file_path.endswith('.gzip') or file_path.endswith('.gz'): + compression = 'gzip' + with open(file_path, 'rb') as file: + return client.raw_insert(full_table, + column_names=column_names, + insert_block=file, + fmt=fmt, + settings=settings, + compression=compression) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py new file mode 100644 index 0000000000..e781f63179 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py @@ -0,0 +1,118 @@ +import logging +from typing import Union + +from clickhouse_connect.datatypes import registry +from clickhouse_connect.driver.common import write_leb128 +from clickhouse_connect.driver.exceptions import StreamCompleteException, StreamFailureError +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.npquery import NumpyResult +from clickhouse_connect.driver.query import QueryResult, QueryContext +from clickhouse_connect.driver.types import ByteSource +from clickhouse_connect.driver.compression import get_compressor + +_EMPTY_CTX = QueryContext() + +logger = logging.getLogger(__name__) + + +class NativeTransform: + # pylint: disable=too-many-locals + @staticmethod + def parse_response(source: ByteSource, context: QueryContext = _EMPTY_CTX) -> Union[NumpyResult, QueryResult]: + names = [] + col_types = [] + block_num = 0 + + def get_block(): + nonlocal block_num + result_block = [] + try: + try: + if context.block_info: + source.read_bytes(8) + num_cols = source.read_leb128() + except StreamCompleteException: + return None + num_rows = source.read_leb128() + for col_num in range(num_cols): + name = source.read_leb128_str() + type_name = source.read_leb128_str() + if block_num == 0: + names.append(name) + col_type = registry.get_from_name(type_name) + col_types.append(col_type) + else: + col_type = col_types[col_num] + if num_rows == 0: + result_block.append(tuple()) + else: + context.start_column(name) + column = col_type.read_column(source, num_rows, context) + result_block.append(column) + except Exception as ex: + source.close() + if isinstance(ex, StreamCompleteException): + # We ran out of data before it was expected, this could be ClickHouse reporting an error + # in the response + message = source.last_message + if len(message) > 1024: + message = message[-1024:] + error_start = message.find('Code: ') + if error_start != -1: + message = message[error_start:] + raise StreamFailureError(message) from None + raise + block_num += 1 + return result_block + + first_block = get_block() + if first_block is None: + return NumpyResult() if context.use_numpy else QueryResult([]) + + def gen(): + yield first_block + while True: + next_block = get_block() + if next_block is None: + return + yield next_block + + if context.use_numpy: + res_types = [col.dtype if hasattr(col, 'dtype') else 'O' for col in first_block] + return NumpyResult(gen(), tuple(names), tuple(col_types), res_types, source) + return QueryResult(None, gen(), tuple(names), tuple(col_types), context.column_oriented, source) + + @staticmethod + def build_insert(context: InsertContext): + compressor = get_compressor(context.compression) + + def chunk_gen(): + for block in context.next_block(): + output = bytearray() + output += block.prefix + write_leb128(block.column_count, output) + write_leb128(block.row_count, output) + for col_name, col_type, data in zip(block.column_names, block.column_types, block.column_data): + write_leb128(len(col_name), output) + output += col_name.encode() + write_leb128(len(col_type.name), output) + output += col_type.name.encode() + context.start_column(col_name) + try: + col_type.write_column(data, output, context) + except Exception as ex: # pylint: disable=broad-except + # This is hideous, but some low level serializations can fail while streaming + # the insert if the user has included bad data in the column. We need to ensure that the + # insert fails (using garbage data) to avoid a partial insert, and use the context to + # propagate the correct exception to the user + logger.error('Error serializing column `%s` into into data type `%s`', + col_name, col_type.name, exc_info=True) + context.insert_exception = ex + yield 'INTERNAL EXCEPTION WHILE SERIALIZING'.encode() + return + yield compressor.compress_block(output) + footer = compressor.flush() + if footer: + yield footer + + return chunk_gen() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/types.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/types.py new file mode 100644 index 0000000000..015e162fbe --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/types.py @@ -0,0 +1,50 @@ +from abc import ABC, abstractmethod +from typing import Sequence, Any + +Matrix = Sequence[Sequence[Any]] + + +class Closable(ABC): + @abstractmethod + def close(self): + pass + + +class ByteSource(Closable): + last_message = None + + @abstractmethod + def read_leb128(self) -> int: + pass + + @abstractmethod + def read_leb128_str(self) -> str: + pass + + @abstractmethod + def read_uint64(self) -> int: + pass + + @abstractmethod + def read_bytes(self, sz: int) -> bytes: + pass + + @abstractmethod + def read_str_col(self, num_rows: int, encoding: str, nullable: bool = False, null_obj: Any = None): + pass + + @abstractmethod + def read_bytes_col(self, sz: int, num_rows: int): + pass + + @abstractmethod + def read_fixed_str_col(self, sz: int, num_rows: int, encoding: str): + pass + + @abstractmethod + def read_array(self, array_type: str, num_rows: int): + pass + + @abstractmethod + def read_byte(self) -> int: + pass |