aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/clickhouse-connect/clickhouse_connect/driver
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-14 09:58:56 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/python/clickhouse-connect/clickhouse_connect/driver
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
downloadydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/python/clickhouse-connect/clickhouse_connect/driver')
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py118
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/buffer.py140
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py727
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py206
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/compression.py77
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/constants.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py72
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py49
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/dataconv.py129
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/ddl.py28
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/exceptions.py84
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/external.py127
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py473
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py226
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py199
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/models.py37
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/npconv.py9
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/npquery.py132
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/options.py52
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py166
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py496
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/summary.py39
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py28
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py118
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/types.py50
25 files changed, 3784 insertions, 0 deletions
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py
new file mode 100644
index 0000000000..1320c04213
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py
@@ -0,0 +1,118 @@
+from inspect import signature
+from typing import Optional, Union, Dict, Any
+from urllib.parse import urlparse, parse_qs
+
+import clickhouse_connect.driver.ctypes
+from clickhouse_connect.driver.client import Client
+from clickhouse_connect.driver.common import dict_copy
+from clickhouse_connect.driver.exceptions import ProgrammingError
+from clickhouse_connect.driver.httpclient import HttpClient
+
+
+# pylint: disable=too-many-arguments,too-many-locals,too-many-branches
+def create_client(*,
+ host: str = None,
+ username: str = None,
+ password: str = '',
+ database: str = '__default__',
+ interface: Optional[str] = None,
+ port: int = 0,
+ secure: Union[bool, str] = False,
+ dsn: Optional[str] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ generic_args: Optional[Dict[str, Any]] = None,
+ **kwargs) -> Client:
+ """
+ The preferred method to get a ClickHouse Connect Client instance
+
+ :param host: The hostname or IP address of the ClickHouse server. If not set, localhost will be used.
+ :param username: The ClickHouse username. If not set, the default ClickHouse user will be used.
+ :param password: The password for username.
+ :param database: The default database for the connection. If not set, ClickHouse Connect will use the
+ default database for username.
+ :param interface: Must be http or https. Defaults to http, or to https if port is set to 8443 or 443
+ :param port: The ClickHouse HTTP or HTTPS port. If not set will default to 8123, or to 8443 if secure=True
+ or interface=https.
+ :param secure: Use https/TLS. This overrides inferred values from the interface or port arguments.
+ :param dsn: A string in standard DSN (Data Source Name) format. Other connection values (such as host or user)
+ will be extracted from this string if not set otherwise.
+ :param settings: ClickHouse server settings to be used with the session/every request
+ :param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings.
+ It is not recommended to use this parameter externally.
+
+ :param kwargs -- Recognized keyword arguments (used by the HTTP client), see below
+
+ :param compress: Enable compression for ClickHouse HTTP inserts and query results. True will select the preferred
+ compression method (lz4). A str of 'lz4', 'zstd', 'brotli', or 'gzip' can be used to use a specific compression type
+ :param query_limit: Default LIMIT on returned rows. 0 means no limit
+ :param connect_timeout: Timeout in seconds for the http connection
+ :param send_receive_timeout: Read timeout in seconds for http connection
+ :param client_name: client_name prepended to the HTTP User Agent header. Set this to track client queries
+ in the ClickHouse system.query_log.
+ :param send_progress: Deprecated, has no effect. Previous functionality is now automatically determined
+ :param verify: Verify the server certificate in secure/https mode
+ :param ca_cert: If verify is True, the file path to Certificate Authority root to validate ClickHouse server
+ certificate, in .pem format. Ignored if verify is False. This is not necessary if the ClickHouse server
+ certificate is trusted by the operating system. To trust the maintained list of "global" public root
+ certificates maintained by the Python 'certifi' package, set ca_cert to 'certifi'
+ :param client_cert: File path to a TLS Client certificate in .pem format. This file should contain any
+ applicable intermediate certificates
+ :param client_cert_key: File path to the private key for the Client Certificate. Required if the private key
+ is not included the Client Certificate key file
+ :param session_id ClickHouse session id. If not specified and the common setting 'autogenerate_session_id'
+ is True, the client will generate a UUID1 session id
+ :param pool_mgr Optional urllib3 PoolManager for this client. Useful for creating separate connection
+ pools for multiple client endpoints for applications with many clients
+ :param http_proxy http proxy address. Equivalent to setting the HTTP_PROXY environment variable
+ :param https_proxy https proxy address. Equivalent to setting the HTTPS_PROXY environment variable
+ :param server_host_name This is the server host name that will be checked against a TLS certificate for
+ validity. This option can be used if using an ssh_tunnel or other indirect means to an ClickHouse server
+ where the `host` argument refers to the tunnel or proxy and not the actual ClickHouse server
+ :return: ClickHouse Connect Client instance
+ """
+ if dsn:
+ parsed = urlparse(dsn)
+ username = username or parsed.username
+ password = password or parsed.password
+ host = host or parsed.hostname
+ port = port or parsed.port
+ if parsed.path and (not database or database == '__default__'):
+ database = parsed.path[1:].split('/')[0]
+ database = database or parsed.path
+ kwargs.update(dict(parse_qs(parsed.query)))
+ use_tls = str(secure).lower() == 'true' or interface == 'https' or (not interface and port in (443, 8443))
+ if not host:
+ host = 'localhost'
+ if not interface:
+ interface = 'https' if use_tls else 'http'
+ port = port or default_port(interface, use_tls)
+ if username is None and 'user' in kwargs:
+ username = kwargs.pop('user')
+ if username is None and 'user_name' in kwargs:
+ username = kwargs.pop('user_name')
+ if password and username is None:
+ username = 'default'
+ if 'compression' in kwargs and 'compress' not in kwargs:
+ kwargs['compress'] = kwargs.pop('compression')
+ settings = settings or {}
+ if interface.startswith('http'):
+ if generic_args:
+ client_params = signature(HttpClient).parameters
+ for name, value in generic_args.items():
+ if name in client_params:
+ kwargs[name] = value
+ elif name == 'compression':
+ if 'compress' not in kwargs:
+ kwargs['compress'] = value
+ else:
+ if name.startswith('ch_'):
+ name = name[3:]
+ settings[name] = value
+ return HttpClient(interface, host, port, username, password, database, settings=settings, **kwargs)
+ raise ProgrammingError(f'Unrecognized client type {interface}')
+
+
+def default_port(interface: str, secure: bool):
+ if interface.startswith('http'):
+ return 8443 if secure else 8123
+ raise ValueError('Unrecognized ClickHouse interface')
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/buffer.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/buffer.py
new file mode 100644
index 0000000000..b50b9bb678
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/buffer.py
@@ -0,0 +1,140 @@
+import sys
+import array
+from typing import Any, Iterable
+
+from clickhouse_connect.driver.exceptions import StreamCompleteException
+from clickhouse_connect.driver.types import ByteSource
+
+must_swap = sys.byteorder == 'big'
+
+
+class ResponseBuffer(ByteSource):
+ slots = 'slice_sz', 'buf_loc', 'end', 'gen', 'buffer', 'slice'
+
+ def __init__(self, source):
+ self.slice_sz = 4096
+ self.buf_loc = 0
+ self.buf_sz = 0
+ self.source = source
+ self.gen = source.gen
+ self.buffer = bytes()
+
+ def read_bytes(self, sz: int):
+ if self.buf_loc + sz <= self.buf_sz:
+ self.buf_loc += sz
+ return self.buffer[self.buf_loc - sz: self.buf_loc]
+ # Create a temporary buffer that bridges two or more source chunks
+ bridge = bytearray(self.buffer[self.buf_loc: self.buf_sz])
+ self.buf_loc = 0
+ self.buf_sz = 0
+ while len(bridge) < sz:
+ chunk = next(self.gen, None)
+ if not chunk:
+ raise StreamCompleteException
+ x = len(chunk)
+ if len(bridge) + x <= sz:
+ bridge.extend(chunk)
+ else:
+ tail = sz - len(bridge)
+ bridge.extend(chunk[:tail])
+ self.buffer = chunk
+ self.buf_sz = x
+ self.buf_loc = tail
+ return bridge
+
+ def read_byte(self) -> int:
+ if self.buf_loc < self.buf_sz:
+ self.buf_loc += 1
+ return self.buffer[self.buf_loc - 1]
+ self.buf_sz = 0
+ self.buf_loc = 0
+ chunk = next(self.gen, None)
+ if not chunk:
+ raise StreamCompleteException
+ x = len(chunk)
+ if x > 1:
+ self.buffer = chunk
+ self.buf_loc = 1
+ self.buf_sz = x
+ return chunk[0]
+
+ def read_leb128(self) -> int:
+ sz = 0
+ shift = 0
+ while True:
+ b = self.read_byte()
+ sz += ((b & 0x7f) << shift)
+ if (b & 0x80) == 0:
+ return sz
+ shift += 7
+
+ def read_leb128_str(self) -> str:
+ sz = self.read_leb128()
+ return self.read_bytes(sz).decode()
+
+ def read_uint64(self) -> int:
+ return int.from_bytes(self.read_bytes(8), 'little', signed=False)
+
+ def read_str_col(self,
+ num_rows: int,
+ encoding: str,
+ nullable: bool = False,
+ null_obj: Any = None) -> Iterable[str]:
+ column = []
+ app = column.append
+ null_map = self.read_bytes(num_rows) if nullable else None
+ for ix in range(num_rows):
+ sz = 0
+ shift = 0
+ while True:
+ b = self.read_byte()
+ sz += ((b & 0x7f) << shift)
+ if (b & 0x80) == 0:
+ break
+ shift += 7
+ x = self.read_bytes(sz)
+ if null_map and null_map[ix]:
+ app(null_obj)
+ elif encoding:
+ try:
+ app(x.decode(encoding))
+ except UnicodeDecodeError:
+ app(x.hex())
+ else:
+ app(x)
+ return column
+
+ def read_bytes_col(self, sz: int, num_rows: int) -> Iterable[bytes]:
+ source = self.read_bytes(sz * num_rows)
+ return [bytes(source[x:x+sz]) for x in range(0, sz * num_rows, sz)]
+
+ def read_fixed_str_col(self, sz: int, num_rows: int, encoding: str) -> Iterable[str]:
+ source = self.read_bytes(sz * num_rows)
+ column = []
+ app = column.append
+ for ix in range(0, sz * num_rows, sz):
+ try:
+ app(str(source[ix: ix + sz], encoding).rstrip('\x00'))
+ except UnicodeDecodeError:
+ app(source[ix: ix + sz].hex())
+ return column
+
+ def read_array(self, array_type: str, num_rows: int) -> Iterable[Any]:
+ column = array.array(array_type)
+ sz = column.itemsize * num_rows
+ b = self.read_bytes(sz)
+ column.frombytes(b)
+ if must_swap:
+ column.byteswap()
+ return column
+
+ @property
+ def last_message(self):
+ if len(self.buffer) == 0:
+ return None
+ return self.buffer.decode()
+
+ def close(self):
+ if self.source:
+ self.source.close()
+ self.source = None
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py
new file mode 100644
index 0000000000..beaea8e0d1
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py
@@ -0,0 +1,727 @@
+import io
+import logging
+from datetime import tzinfo, datetime
+
+import pytz
+
+from abc import ABC, abstractmethod
+from typing import Iterable, Optional, Any, Union, Sequence, Dict, Generator, BinaryIO
+from pytz.exceptions import UnknownTimeZoneError
+
+from clickhouse_connect import common
+from clickhouse_connect.common import version
+from clickhouse_connect.datatypes.registry import get_from_name
+from clickhouse_connect.datatypes.base import ClickHouseType
+from clickhouse_connect.driver.common import dict_copy, StreamContext, coerce_int, coerce_bool
+from clickhouse_connect.driver.constants import CH_VERSION_WITH_PROTOCOL, PROTOCOL_VERSION_WITH_LOW_CARD
+from clickhouse_connect.driver.exceptions import ProgrammingError, OperationalError
+from clickhouse_connect.driver.external import ExternalData
+from clickhouse_connect.driver.insert import InsertContext
+from clickhouse_connect.driver.summary import QuerySummary
+from clickhouse_connect.driver.models import ColumnDef, SettingDef, SettingStatus
+from clickhouse_connect.driver.query import QueryResult, to_arrow, QueryContext, arrow_buffer
+
+io.DEFAULT_BUFFER_SIZE = 1024 * 256
+logger = logging.getLogger(__name__)
+arrow_str_setting = 'output_format_arrow_string_as_string'
+
+
+# pylint: disable=too-many-public-methods, too-many-instance-attributes
+class Client(ABC):
+ """
+ Base ClickHouse Connect client
+ """
+ compression: str = None
+ write_compression: str = None
+ protocol_version = 0
+ valid_transport_settings = set()
+ optional_transport_settings = set()
+ database = None
+ max_error_message = 0
+
+ def __init__(self,
+ database: str,
+ query_limit: int,
+ uri: str,
+ query_retries: int,
+ server_host_name: Optional[str],
+ apply_server_timezone: Optional[Union[str, bool]]):
+ """
+ Shared initialization of ClickHouse Connect client
+ :param database: database name
+ :param query_limit: default LIMIT for queries
+ :param uri: uri for error messages
+ """
+ self.query_limit = coerce_int(query_limit)
+ self.query_retries = coerce_int(query_retries)
+ self.server_host_name = server_host_name
+ self.server_tz = pytz.UTC
+ self.server_version, server_tz = \
+ tuple(self.command('SELECT version(), timezone()', use_database=False))
+ try:
+ self.server_tz = pytz.timezone(server_tz)
+ except UnknownTimeZoneError:
+ logger.warning('Warning, server is using an unrecognized timezone %s, will use UTC default', server_tz)
+ offsets_differ = datetime.now().astimezone().utcoffset() != datetime.now(tz=self.server_tz).utcoffset()
+ self.apply_server_timezone = apply_server_timezone == 'always' or (
+ coerce_bool(apply_server_timezone) and offsets_differ)
+ readonly = 'readonly'
+ if not self.min_version('19.17'):
+ readonly = common.get_setting('readonly')
+ server_settings = self.query(f'SELECT name, value, {readonly} as readonly FROM system.settings LIMIT 10000')
+ self.server_settings = {row['name']: SettingDef(**row) for row in server_settings.named_results()}
+ if database and not database == '__default__':
+ self.database = database
+ if self.min_version(CH_VERSION_WITH_PROTOCOL):
+ # Unfortunately we have to validate that the client protocol version is actually used by ClickHouse
+ # since the query parameter could be stripped off (in particular, by CHProxy)
+ test_data = self.raw_query('SELECT 1 AS check', fmt='Native', settings={
+ 'client_protocol_version': PROTOCOL_VERSION_WITH_LOW_CARD
+ })
+ if test_data[8:16] == b'\x01\x01\x05check':
+ self.protocol_version = PROTOCOL_VERSION_WITH_LOW_CARD
+ self.uri = uri
+
+ def _validate_settings(self, settings: Optional[Dict[str, Any]]) -> Dict[str, str]:
+ """
+ This strips any ClickHouse settings that are not recognized or are read only.
+ :param settings: Dictionary of setting name and values
+ :return: A filtered dictionary of settings with values rendered as strings
+ """
+ validated = {}
+ invalid_action = common.get_setting('invalid_setting_action')
+ for key, value in settings.items():
+ str_value = self._validate_setting(key, value, invalid_action)
+ if str_value is not None:
+ validated[key] = value
+ return validated
+
+ def _validate_setting(self, key: str, value: Any, invalid_action: str) -> Optional[str]:
+ if key not in self.valid_transport_settings:
+ setting_def = self.server_settings.get(key)
+ if setting_def is None or setting_def.readonly:
+ if key in self.optional_transport_settings:
+ return None
+ if invalid_action == 'send':
+ logger.warning('Attempting to send unrecognized or readonly setting %s', key)
+ elif invalid_action == 'drop':
+ logger.warning('Dropping unrecognized or readonly settings %s', key)
+ return None
+ else:
+ raise ProgrammingError(f'Setting {key} is unknown or readonly') from None
+ if isinstance(value, bool):
+ return '1' if value else '0'
+ return str(value)
+
+ def _setting_status(self, key: str) -> SettingStatus:
+ comp_setting = self.server_settings.get(key)
+ if not comp_setting:
+ return SettingStatus(False, False)
+ return SettingStatus(comp_setting.value != '0', comp_setting.readonly != 1)
+
+ def _prep_query(self, context: QueryContext):
+ if context.is_select and not context.has_limit and self.query_limit:
+ return f'{context.final_query}\n LIMIT {self.query_limit}'
+ return context.final_query
+
+ def _check_tz_change(self, new_tz) -> Optional[tzinfo]:
+ if new_tz:
+ try:
+ new_tzinfo = pytz.timezone(new_tz)
+ if new_tzinfo != self.server_tz:
+ return new_tzinfo
+ except UnknownTimeZoneError:
+ logger.warning('Unrecognized timezone %s received from ClickHouse', new_tz)
+ return None
+
+ @abstractmethod
+ def _query_with_context(self, context: QueryContext):
+ pass
+
+ @abstractmethod
+ def set_client_setting(self, key, value):
+ """
+ Set a clickhouse setting for the client after initialization. If a setting is not recognized by ClickHouse,
+ or the setting is identified as "read_only", this call will either throw a Programming exception or attempt
+ to send the setting anyway based on the common setting 'invalid_setting_action'
+ :param key: ClickHouse setting name
+ :param value: ClickHouse setting value
+ """
+
+ @abstractmethod
+ def get_client_setting(self, key) -> Optional[str]:
+ """
+ :param key: The setting key
+ :return: The string value of the setting, if it exists, or None
+ """
+
+ # pylint: disable=too-many-arguments,unused-argument,too-many-locals
+ def query(self,
+ query: str = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ column_oriented: Optional[bool] = None,
+ use_numpy: Optional[bool] = None,
+ max_str_len: Optional[int] = None,
+ context: QueryContext = None,
+ query_tz: Optional[Union[str, tzinfo]] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ external_data: Optional[ExternalData] = None) -> QueryResult:
+ """
+ Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. For
+ parameters, see the create_query_context method
+ :return: QueryResult -- data and metadata from response
+ """
+ if query and query.lower().strip().startswith('select __connect_version__'):
+ return QueryResult([[f'ClickHouse Connect v.{version()} ⓒ ClickHouse Inc.']], None,
+ ('connect_version',), (get_from_name('String'),))
+ kwargs = locals().copy()
+ del kwargs['self']
+ query_context = self.create_query_context(**kwargs)
+ if query_context.is_command:
+ response = self.command(query,
+ parameters=query_context.parameters,
+ settings=query_context.settings,
+ external_data=query_context.external_data)
+ if isinstance(response, QuerySummary):
+ return response.as_query_result()
+ return QueryResult([response] if isinstance(response, list) else [[response]])
+ return self._query_with_context(query_context)
+
+ def query_column_block_stream(self,
+ query: str = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ context: QueryContext = None,
+ query_tz: Optional[Union[str, tzinfo]] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ external_data: Optional[ExternalData] = None) -> StreamContext:
+ """
+ Variation of main query method that returns a stream of column oriented blocks. For
+ parameters, see the create_query_context method.
+ :return: StreamContext -- Iterable stream context that returns column oriented blocks
+ """
+ return self._context_query(locals(), use_numpy=False, streaming=True).column_block_stream
+
+ def query_row_block_stream(self,
+ query: str = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ context: QueryContext = None,
+ query_tz: Optional[Union[str, tzinfo]] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ external_data: Optional[ExternalData] = None) -> StreamContext:
+ """
+ Variation of main query method that returns a stream of row oriented blocks. For
+ parameters, see the create_query_context method.
+ :return: StreamContext -- Iterable stream context that returns blocks of rows
+ """
+ return self._context_query(locals(), use_numpy=False, streaming=True).row_block_stream
+
+ def query_rows_stream(self,
+ query: str = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ context: QueryContext = None,
+ query_tz: Optional[Union[str, tzinfo]] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ external_data: Optional[ExternalData] = None) -> StreamContext:
+ """
+ Variation of main query method that returns a stream of row oriented blocks. For
+ parameters, see the create_query_context method.
+ :return: StreamContext -- Iterable stream context that returns blocks of rows
+ """
+ return self._context_query(locals(), use_numpy=False, streaming=True).rows_stream
+
+ @abstractmethod
+ def raw_query(self, query: str,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ fmt: str = None,
+ use_database: bool = True,
+ external_data: Optional[ExternalData] = None) -> bytes:
+ """
+ Query method that simply returns the raw ClickHouse format bytes
+ :param query: Query statement/format string
+ :param parameters: Optional dictionary used to format the query
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param fmt: ClickHouse output format
+ :param use_database Send the database parameter to ClickHouse so the command will be executed in the client
+ database context.
+ :param external_data External data to send with the query
+ :return: bytes representing raw ClickHouse return value based on format
+ """
+
+ # pylint: disable=duplicate-code,too-many-arguments,unused-argument
+ def query_np(self,
+ query: str = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, str]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ max_str_len: Optional[int] = None,
+ context: QueryContext = None,
+ external_data: Optional[ExternalData] = None):
+ """
+ Query method that returns the results as a numpy array. For parameter values, see the
+ create_query_context method
+ :return: Numpy array representing the result set
+ """
+ return self._context_query(locals(), use_numpy=True).np_result
+
+ # pylint: disable=duplicate-code,too-many-arguments,unused-argument
+ def query_np_stream(self,
+ query: str = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, str]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ max_str_len: Optional[int] = None,
+ context: QueryContext = None,
+ external_data: Optional[ExternalData] = None) -> StreamContext:
+ """
+ Query method that returns the results as a stream of numpy arrays. For parameter values, see the
+ create_query_context method
+ :return: Generator that yield a numpy array per block representing the result set
+ """
+ return self._context_query(locals(), use_numpy=True, streaming=True).np_stream
+
+ # pylint: disable=duplicate-code,too-many-arguments,unused-argument
+ def query_df(self,
+ query: str = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, str]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ max_str_len: Optional[int] = None,
+ use_na_values: Optional[bool] = None,
+ query_tz: Optional[str] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ context: QueryContext = None,
+ external_data: Optional[ExternalData] = None,
+ use_extended_dtypes: Optional[bool] = None):
+ """
+ Query method that results the results as a pandas dataframe. For parameter values, see the
+ create_query_context method
+ :return: Pandas dataframe representing the result set
+ """
+ return self._context_query(locals(), use_numpy=True, as_pandas=True).df_result
+
+ # pylint: disable=duplicate-code,too-many-arguments,unused-argument
+ def query_df_stream(self,
+ query: str = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, str]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ max_str_len: Optional[int] = None,
+ use_na_values: Optional[bool] = None,
+ query_tz: Optional[str] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ context: QueryContext = None,
+ external_data: Optional[ExternalData] = None,
+ use_extended_dtypes: Optional[bool] = None) -> StreamContext:
+ """
+ Query method that returns the results as a StreamContext. For parameter values, see the
+ create_query_context method
+ :return: Pandas dataframe representing the result set
+ """
+ return self._context_query(locals(), use_numpy=True,
+ as_pandas=True,
+ streaming=True).df_stream
+
+ def create_query_context(self,
+ query: str = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ column_oriented: Optional[bool] = None,
+ use_numpy: Optional[bool] = False,
+ max_str_len: Optional[int] = 0,
+ context: Optional[QueryContext] = None,
+ query_tz: Optional[Union[str, tzinfo]] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ use_na_values: Optional[bool] = None,
+ streaming: bool = False,
+ as_pandas: bool = False,
+ external_data: Optional[ExternalData] = None,
+ use_extended_dtypes: Optional[bool] = None) -> QueryContext:
+ """
+ Creates or updates a reusable QueryContext object
+ :param query: Query statement/format string
+ :param parameters: Optional dictionary used to format the query
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param query_formats: See QueryContext __init__ docstring
+ :param column_formats: See QueryContext __init__ docstring
+ :param encoding: See QueryContext __init__ docstring
+ :param use_none: Use None for ClickHouse NULL instead of default values. Note that using None in Numpy
+ arrays will force the numpy array dtype to 'object', which is often inefficient. This effect also
+ will impact the performance of Pandas dataframes.
+ :param column_oriented: Deprecated. Controls orientation of the QueryResult result_set property
+ :param use_numpy: Return QueryResult columns as one-dimensional numpy arrays
+ :param max_str_len: Limit returned ClickHouse String values to this length, which allows a Numpy
+ structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for
+ String columns will always be object arrays
+ :param context: An existing QueryContext to be updated with any provided parameter values
+ :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects).
+ Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime
+ objects with the selected timezone.
+ :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to
+ tzinfo objects). The timezone will be applied to datetime objects returned in the query
+ :param use_na_values: Deprecated alias for use_advanced_dtypes
+ :param as_pandas Return the result columns as pandas.Series objects
+ :param streaming Marker used to correctly configure streaming queries
+ :param external_data ClickHouse "external data" to send with query
+ :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as
+ pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray
+ and StringArray. Defaulted to True for query_df methods
+ :return: Reusable QueryContext
+ """
+ if context:
+ return context.updated_copy(query=query,
+ parameters=parameters,
+ settings=settings,
+ query_formats=query_formats,
+ column_formats=column_formats,
+ encoding=encoding,
+ server_tz=self.server_tz,
+ use_none=use_none,
+ column_oriented=column_oriented,
+ use_numpy=use_numpy,
+ max_str_len=max_str_len,
+ query_tz=query_tz,
+ column_tzs=column_tzs,
+ as_pandas=as_pandas,
+ use_extended_dtypes=use_extended_dtypes,
+ streaming=streaming,
+ external_data=external_data)
+ if use_numpy and max_str_len is None:
+ max_str_len = 0
+ if use_extended_dtypes is None:
+ use_extended_dtypes = use_na_values
+ if as_pandas and use_extended_dtypes is None:
+ use_extended_dtypes = True
+ return QueryContext(query=query,
+ parameters=parameters,
+ settings=settings,
+ query_formats=query_formats,
+ column_formats=column_formats,
+ encoding=encoding,
+ server_tz=self.server_tz,
+ use_none=use_none,
+ column_oriented=column_oriented,
+ use_numpy=use_numpy,
+ max_str_len=max_str_len,
+ query_tz=query_tz,
+ column_tzs=column_tzs,
+ use_extended_dtypes=use_extended_dtypes,
+ as_pandas=as_pandas,
+ streaming=streaming,
+ apply_server_tz=self.apply_server_timezone,
+ external_data=external_data)
+
+ def query_arrow(self,
+ query: str,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ use_strings: Optional[bool] = None,
+ external_data: Optional[ExternalData] = None):
+ """
+ Query method using the ClickHouse Arrow format to return a PyArrow table
+ :param query: Query statement/format string
+ :param parameters: Optional dictionary used to format the query
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary)
+ :param external_data ClickHouse "external data" to send with query
+ :return: PyArrow.Table
+ """
+ settings = dict_copy(settings)
+ if self.database:
+ settings['database'] = self.database
+ str_status = self._setting_status(arrow_str_setting)
+ if use_strings is None:
+ if str_status.is_writable and not str_status.is_set:
+ settings[arrow_str_setting] = '1' # Default to returning strings if possible
+ elif use_strings != str_status.is_set:
+ if not str_status.is_writable:
+ raise OperationalError(f'Cannot change readonly {arrow_str_setting} to {use_strings}')
+ settings[arrow_str_setting] = '1' if use_strings else '0'
+ return to_arrow(self.raw_query(query,
+ parameters,
+ settings,
+ fmt='Arrow',
+ external_data=external_data))
+
+ @abstractmethod
+ def command(self,
+ cmd: str,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ data: Union[str, bytes] = None,
+ settings: Dict[str, Any] = None,
+ use_database: bool = True,
+ external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]:
+ """
+ Client method that returns a single value instead of a result set
+ :param cmd: ClickHouse query/command as a python format string
+ :param parameters: Optional dictionary of key/values pairs to be formatted
+ :param data: Optional 'data' for the command (for INSERT INTO in particular)
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client
+ database context. Otherwise, no database will be specified with the command. This is useful for determining
+ the default user database
+ :param external_data ClickHouse "external data" to send with command/query
+ :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary
+ if no data returned
+ """
+
+ @abstractmethod
+ def ping(self) -> bool:
+ """
+ Validate the connection, does not throw an Exception (see debug logs)
+ :return: ClickHouse server is up and reachable
+ """
+
+ # pylint: disable=too-many-arguments
+ def insert(self,
+ table: Optional[str] = None,
+ data: Sequence[Sequence[Any]] = None,
+ column_names: Union[str, Iterable[str]] = '*',
+ database: Optional[str] = None,
+ column_types: Sequence[ClickHouseType] = None,
+ column_type_names: Sequence[str] = None,
+ column_oriented: bool = False,
+ settings: Optional[Dict[str, Any]] = None,
+ context: InsertContext = None) -> QuerySummary:
+ """
+ Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments
+ other than data are ignored
+ :param table: Target table
+ :param data: Sequence of sequences of Python data
+ :param column_names: Ordered list of column names or '*' if column types should be retrieved from the
+ ClickHouse table definition
+ :param database: Target database -- will use client default database if not specified.
+ :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from
+ the server
+ :param column_type_names: ClickHouse column type names. If set then column data does not need to be
+ retrieved from the server
+ :param column_oriented: If true the data is already "pivoted" in column form
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param context: Optional reusable insert context to allow repeated inserts into the same table with
+ different data batches
+ :return: QuerySummary with summary information, throws exception if insert fails
+ """
+ if (context is None or context.empty) and data is None:
+ raise ProgrammingError('No data specified for insert') from None
+ if context is None:
+ context = self.create_insert_context(table,
+ column_names,
+ database,
+ column_types,
+ column_type_names,
+ column_oriented,
+ settings)
+ if data is not None:
+ if not context.empty:
+ raise ProgrammingError('Attempting to insert new data with non-empty insert context') from None
+ context.data = data
+ return self.data_insert(context)
+
+ def insert_df(self, table: str = None,
+ df=None,
+ database: Optional[str] = None,
+ settings: Optional[Dict] = None,
+ column_names: Optional[Sequence[str]] = None,
+ column_types: Sequence[ClickHouseType] = None,
+ column_type_names: Sequence[str] = None,
+ context: InsertContext = None) -> QuerySummary:
+ """
+ Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored
+ :param table: ClickHouse table
+ :param df: two-dimensional pandas dataframe
+ :param database: Optional ClickHouse database
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param column_names: An optional list of ClickHouse column names. If not set, the DataFrame column names
+ will be used
+ :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from
+ the server
+ :param column_type_names: ClickHouse column type names. If set then column data does not need to be
+ retrieved from the server
+ :param context: Optional reusable insert context to allow repeated inserts into the same table with
+ different data batches
+ :return: QuerySummary with summary information, throws exception if insert fails
+ """
+ if context is None:
+ if column_names is None:
+ column_names = df.columns
+ elif len(column_names) != len(df.columns):
+ raise ProgrammingError('DataFrame column count does not match insert_columns') from None
+ return self.insert(table,
+ df,
+ column_names,
+ database,
+ column_types=column_types,
+ column_type_names=column_type_names,
+ settings=settings, context=context)
+
+ def insert_arrow(self, table: str,
+ arrow_table, database: str = None,
+ settings: Optional[Dict] = None) -> QuerySummary:
+ """
+ Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format
+ :param table: ClickHouse table
+ :param arrow_table: PyArrow Table object
+ :param database: Optional ClickHouse database
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :return: QuerySummary with summary information, throws exception if insert fails
+ """
+ full_table = table if '.' in table or not database else f'{database}.{table}'
+ column_names, insert_block = arrow_buffer(arrow_table)
+ return self.raw_insert(full_table, column_names, insert_block, settings, 'Arrow')
+
+ def create_insert_context(self,
+ table: str,
+ column_names: Optional[Union[str, Sequence[str]]] = None,
+ database: Optional[str] = None,
+ column_types: Sequence[ClickHouseType] = None,
+ column_type_names: Sequence[str] = None,
+ column_oriented: bool = False,
+ settings: Optional[Dict[str, Any]] = None,
+ data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext:
+ """
+ Builds a reusable insert context to hold state for a duration of an insert
+ :param table: Target table
+ :param database: Target database. If not set, uses the client default database
+ :param column_names: Optional ordered list of column names. If not set, all columns ('*') will be assumed
+ in the order specified by the table definition
+ :param database: Target database -- will use client default database if not specified
+ :param column_types: ClickHouse column types. Optional Sequence of ClickHouseType objects. If neither column
+ types nor column type names are set, actual column types will be retrieved from the server.
+ :param column_type_names: ClickHouse column type names. Specified column types by name string
+ :param column_oriented: If true the data is already "pivoted" in column form
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param data: Initial dataset for insert
+ :return Reusable insert context
+ """
+ full_table = table if '.' in table or not database else f'{database}.{table}'
+ column_defs = []
+ if column_types is None and column_type_names is None:
+ describe_result = self.query(f'DESCRIBE TABLE {full_table}')
+ column_defs = [ColumnDef(**row) for row in describe_result.named_results()
+ if row['default_type'] not in ('ALIAS', 'MATERIALIZED')]
+ if column_names is None or isinstance(column_names, str) and column_names == '*':
+ column_names = [cd.name for cd in column_defs]
+ column_types = [cd.ch_type for cd in column_defs]
+ elif isinstance(column_names, str):
+ column_names = [column_names]
+ if len(column_names) == 0:
+ raise ValueError('Column names must be specified for insert')
+ if not column_types:
+ if column_type_names:
+ column_types = [get_from_name(name) for name in column_type_names]
+ else:
+ column_map = {d.name: d for d in column_defs}
+ try:
+ column_types = [column_map[name].ch_type for name in column_names]
+ except KeyError as ex:
+ raise ProgrammingError(f'Unrecognized column {ex} in table {table}') from None
+ if len(column_names) != len(column_types):
+ raise ProgrammingError('Column names do not match column types') from None
+ return InsertContext(full_table,
+ column_names,
+ column_types,
+ column_oriented=column_oriented,
+ settings=settings,
+ data=data)
+
+ def min_version(self, version_str: str) -> bool:
+ """
+ Determine whether the connected server is at least the submitted version
+ For Altinity Stable versions like 22.8.15.25.altinitystable
+ the last condition in the first list comprehension expression is added
+ :param version_str: A version string consisting of up to 4 integers delimited by dots
+ :return: True if version_str is greater than the server_version, False if less than
+ """
+ try:
+ server_parts = [int(x) for x in self.server_version.split('.') if x.isnumeric()]
+ server_parts.extend([0] * (4 - len(server_parts)))
+ version_parts = [int(x) for x in version_str.split('.')]
+ version_parts.extend([0] * (4 - len(version_parts)))
+ except ValueError:
+ logger.warning('Server %s or requested version %s does not match format of numbers separated by dots',
+ self.server_version, version_str)
+ return False
+ for x, y in zip(server_parts, version_parts):
+ if x > y:
+ return True
+ if x < y:
+ return False
+ return True
+
+ @abstractmethod
+ def data_insert(self, context: InsertContext) -> QuerySummary:
+ """
+ Subclass implementation of the data insert
+ :context: InsertContext parameter object
+ :return: No return, throws an exception if the insert fails
+ """
+
+ @abstractmethod
+ def raw_insert(self, table: str,
+ column_names: Optional[Sequence[str]] = None,
+ insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None,
+ settings: Optional[Dict] = None,
+ fmt: Optional[str] = None,
+ compression: Optional[str] = None) -> QuerySummary:
+ """
+ Insert data already formatted in a bytes object
+ :param table: Table name (whether qualified with the database name or not)
+ :param column_names: Sequence of column names
+ :param insert_block: Binary or string data already in a recognized ClickHouse format
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param compression: Recognized ClickHouse `Accept-Encoding` header compression value
+ :param fmt: Valid clickhouse format
+ """
+
+ def close(self):
+ """
+ Subclass implementation to close the connection to the server/deallocate the client
+ """
+
+ def _context_query(self, lcls: dict, **overrides):
+ kwargs = lcls.copy()
+ kwargs.pop('self')
+ kwargs.update(overrides)
+ return self._query_with_context((self.create_query_context(**kwargs)))
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_traceback):
+ self.close()
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py
new file mode 100644
index 0000000000..71adb00321
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py
@@ -0,0 +1,206 @@
+import array
+import struct
+import sys
+
+from typing import Sequence, MutableSequence, Dict, Optional, Union, Generator
+
+from clickhouse_connect.driver.exceptions import ProgrammingError, StreamClosedError, DataError
+from clickhouse_connect.driver.types import Closable
+
+# pylint: disable=invalid-name
+must_swap = sys.byteorder == 'big'
+int_size = array.array('i').itemsize
+low_card_version = 1
+
+array_map = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
+decimal_prec = {32: 9, 64: 18, 128: 38, 256: 79}
+
+if int_size == 2:
+ array_map[4] = 'l'
+
+array_sizes = {v: k for k, v in array_map.items()}
+array_sizes['f'] = 4
+array_sizes['d'] = 8
+np_date_types = {0: '[s]', 3: '[ms]', 6: '[us]', 9: '[ns]'}
+
+
+def array_type(size: int, signed: bool):
+ """
+ Determines the Python array.array code for the requested byte size
+ :param size: byte size
+ :param signed: whether int types should be signed or unsigned
+ :return: Python array.array code
+ """
+ try:
+ code = array_map[size]
+ except KeyError:
+ return None
+ return code if signed else code.upper()
+
+
+def write_array(code: str, column: Sequence, dest: MutableSequence):
+ """
+ Write a column of native Python data matching the array.array code
+ :param code: Python array.array code matching the column data type
+ :param column: Column of native Python values
+ :param dest: Destination byte buffer
+ """
+ if len(column) and not isinstance(column[0], (int, float)):
+ if code in ('f', 'F', 'd', 'D'):
+ column = [float(x) for x in column]
+ else:
+ column = [int(x) for x in column]
+ try:
+ buff = struct.Struct(f'<{len(column)}{code}')
+ dest += buff.pack(*column)
+ except (TypeError, OverflowError, struct.error) as ex:
+ raise DataError('Unable to create Python array. This is usually caused by trying to insert None ' +
+ 'values into a ClickHouse column that is not Nullable') from ex
+
+
+def write_uint64(value: int, dest: MutableSequence):
+ """
+ Write a single UInt64 value to a binary write buffer
+ :param value: UInt64 value to write
+ :param dest: Destination byte buffer
+ """
+ dest.extend(value.to_bytes(8, 'little'))
+
+
+def write_leb128(value: int, dest: MutableSequence):
+ """
+ Write a LEB128 encoded integer to a target binary buffer
+ :param value: Integer value (positive only)
+ :param dest: Target buffer
+ """
+ while True:
+ b = value & 0x7f
+ value >>= 7
+ if value == 0:
+ dest.append(b)
+ return
+ dest.append(0x80 | b)
+
+
+def decimal_size(prec: int):
+ """
+ Determine the bit size of a ClickHouse or Python Decimal needed to store a value of the requested precision
+ :param prec: Precision of the Decimal in total number of base 10 digits
+ :return: Required bit size
+ """
+ if prec < 1 or prec > 79:
+ raise ArithmeticError(f'Invalid precision {prec} for ClickHouse Decimal type')
+ if prec < 10:
+ return 32
+ if prec < 19:
+ return 64
+ if prec < 39:
+ return 128
+ return 256
+
+
+def unescape_identifier(x: str) -> str:
+ if x.startswith('`') and x.endswith('`'):
+ return x[1:-1]
+ return x
+
+
+def dict_copy(source: Dict = None, update: Optional[Dict] = None) -> Dict:
+ copy = source.copy() if source else {}
+ if update:
+ copy.update(update)
+ return copy
+
+
+def empty_gen():
+ yield from ()
+
+
+def coerce_int(val: Optional[Union[str, int]]) -> int:
+ if not val:
+ return 0
+ return int(val)
+
+
+def coerce_bool(val: Optional[Union[str, bool]]):
+ if not val:
+ return False
+ return val in (True, 'True', 'true', '1')
+
+
+class SliceView(Sequence):
+ """
+ Provides a view into a sequence rather than copying. Borrows liberally from
+ https://gist.github.com/mathieucaroff/0cf094325fb5294fb54c6a577f05a2c1
+ Also see the discussion on SO: https://stackoverflow.com/questions/3485475/can-i-create-a-view-on-a-python-list
+ """
+ slots = ('_source', '_range')
+
+ def __init__(self, source: Sequence, source_slice: Optional[slice] = None):
+ if isinstance(source, SliceView):
+ self._source = source._source
+ self._range = source._range[source_slice]
+ else:
+ self._source = source
+ if source_slice is None:
+ self._range = range(len(source))
+ else:
+ self._range = range(len(source))[source_slice]
+
+ def __len__(self):
+ return len(self._range)
+
+ def __getitem__(self, i):
+ if isinstance(i, slice):
+ return SliceView(self._source, i)
+ return self._source[self._range[i]]
+
+ def __str__(self):
+ r = self._range
+ return str(self._source[slice(r.start, r.stop, r.step)])
+
+ def __repr__(self):
+ r = self._range
+ return f'SliceView({self._source[slice(r.start, r.stop, r.step)]})'
+
+ def __eq__(self, other):
+ if self is other:
+ return True
+ if len(self) != len(other):
+ return False
+ for v, w in zip(self, other):
+ if v != w:
+ return False
+ return True
+
+
+class StreamContext:
+ """
+ Wraps a generator and its "source" in a Context. This ensures that the source will be "closed" even if the
+ generator is not fully consumed or there is an exception during consumption
+ """
+ __slots__ = 'source', 'gen', '_in_context'
+
+ def __init__(self, source: Closable, gen: Generator):
+ self.source = source
+ self.gen = gen
+ self._in_context = False
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ if not self._in_context:
+ raise ProgrammingError('Stream should be used within a context')
+ return next(self.gen)
+
+ def __enter__(self):
+ if not self.gen:
+ raise StreamClosedError
+ self._in_context = True
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._in_context = False
+ self.source.close()
+ self.gen = None
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/compression.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/compression.py
new file mode 100644
index 0000000000..db69ae3f04
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/compression.py
@@ -0,0 +1,77 @@
+import zlib
+from abc import abstractmethod
+from typing import Union
+
+import lz4
+import lz4.frame
+import zstandard
+
+try:
+ import brotli
+except ImportError:
+ brotli = None
+
+
+available_compression = ['lz4', 'zstd']
+
+if brotli:
+ available_compression.append('br')
+available_compression.extend(['gzip', 'deflate'])
+
+comp_map = {}
+
+
+class Compressor:
+ def __init_subclass__(cls, tag: str, thread_safe: bool = True):
+ comp_map[tag] = cls() if thread_safe else cls
+
+ @abstractmethod
+ def compress_block(self, block) -> Union[bytes, bytearray]:
+ return block
+
+ def flush(self):
+ pass
+
+
+class GzipCompressor(Compressor, tag='gzip', thread_safe=False):
+ def __init__(self, level: int = 6, wbits: int = 31):
+ self.zlib_obj = zlib.compressobj(level=level, wbits=wbits)
+
+ def compress_block(self, block):
+ return self.zlib_obj.compress(block)
+
+ def flush(self):
+ return self.zlib_obj.flush()
+
+
+class Lz4Compressor(Compressor, tag='lz4', thread_safe=False):
+ def __init__(self):
+ self.comp = lz4.frame.LZ4FrameCompressor()
+
+ def compress_block(self, block):
+ output = self.comp.begin(len(block))
+ output += self.comp.compress(block)
+ return output + self.comp.flush()
+
+
+class ZstdCompressor(Compressor, tag='zstd'):
+ def compress_block(self, block):
+ return zstandard.compress(block)
+
+
+class BrotliCompressor(Compressor, tag='br'):
+ def compress_block(self, block):
+ return brotli.compress(block)
+
+
+null_compressor = Compressor()
+
+
+def get_compressor(compression: str) -> Compressor:
+ if not compression:
+ return null_compressor
+ comp = comp_map[compression]
+ try:
+ return comp()
+ except TypeError:
+ return comp
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/constants.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/constants.py
new file mode 100644
index 0000000000..a242e559b9
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/constants.py
@@ -0,0 +1,2 @@
+PROTOCOL_VERSION_WITH_LOW_CARD = 54405
+CH_VERSION_WITH_PROTOCOL = '23.2.1.2537'
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py
new file mode 100644
index 0000000000..7984fbeebb
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py
@@ -0,0 +1,72 @@
+import logging
+import re
+from datetime import datetime
+from typing import Optional, Dict, Union, Any
+
+import pytz
+
+logger = logging.getLogger(__name__)
+
+_empty_map = {}
+
+
+# pylint: disable=too-many-instance-attributes
+class BaseQueryContext:
+ local_tz: pytz.timezone
+
+ def __init__(self,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ use_extended_dtypes: bool = False,
+ use_numpy: bool = False):
+ self.settings = settings or {}
+ if query_formats is None:
+ self.type_formats = _empty_map
+ else:
+ self.type_formats = {re.compile(type_name.replace('*', '.*'), re.IGNORECASE): fmt
+ for type_name, fmt in query_formats.items()}
+ if column_formats is None:
+ self.col_simple_formats = _empty_map
+ self.col_type_formats = _empty_map
+ else:
+ self.col_simple_formats = {col_name: fmt for col_name, fmt in column_formats.items() if
+ isinstance(fmt, str)}
+ self.col_type_formats = {}
+ for col_name, fmt in column_formats.items():
+ if not isinstance(fmt, str):
+ self.col_type_formats[col_name] = {re.compile(type_name.replace('*', '.*'), re.IGNORECASE): fmt
+ for type_name, fmt in fmt.items()}
+ self.query_formats = query_formats or {}
+ self.column_formats = column_formats or {}
+ self.encoding = encoding
+ self.use_numpy = use_numpy
+ self.use_extended_dtypes = use_extended_dtypes
+ self._active_col_fmt = None
+ self._active_col_type_fmts = _empty_map
+
+ def start_column(self, name: str):
+ self._active_col_fmt = self.col_simple_formats.get(name)
+ self._active_col_type_fmts = self.col_type_formats.get(name, _empty_map)
+
+ def active_fmt(self, ch_type):
+ if self._active_col_fmt:
+ return self._active_col_fmt
+ for type_pattern, fmt in self._active_col_type_fmts.items():
+ if type_pattern.match(ch_type):
+ return fmt
+ for type_pattern, fmt in self.type_formats.items():
+ if type_pattern.match(ch_type):
+ return fmt
+ return None
+
+
+def _init_context_cls():
+ local_tz = datetime.now().astimezone().tzinfo
+ if local_tz.tzname(datetime.now()) in ('UTC', 'GMT', 'Universal', 'GMT-0', 'Zulu', 'Greenwich'):
+ local_tz = pytz.UTC
+ BaseQueryContext.local_tz = local_tz
+
+
+_init_context_cls()
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py
new file mode 100644
index 0000000000..e7bb607e68
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py
@@ -0,0 +1,49 @@
+import logging
+import os
+
+import clickhouse_connect.driver.dataconv as pydc
+import clickhouse_connect.driver.npconv as pync
+from clickhouse_connect.driver.buffer import ResponseBuffer
+from clickhouse_connect.driver.common import coerce_bool
+
+logger = logging.getLogger(__name__)
+
+RespBuffCls = ResponseBuffer
+data_conv = pydc
+numpy_conv = pync
+
+
+# pylint: disable=import-outside-toplevel,global-statement
+
+def connect_c_modules():
+ if not coerce_bool(os.environ.get('CLICKHOUSE_CONNECT_USE_C', True)):
+ logger.info('ClickHouse Connect C optimizations disabled')
+ return
+
+ global RespBuffCls, data_conv
+ try:
+ from clickhouse_connect.driverc.buffer import ResponseBuffer as CResponseBuffer
+ import clickhouse_connect.driverc.dataconv as cdc
+
+ data_conv = cdc
+ RespBuffCls = CResponseBuffer
+ logger.debug('Successfully imported ClickHouse Connect C data optimizations')
+ connect_numpy()
+ except ImportError as ex:
+ logger.warning('Unable to connect optimized C data functions [%s], falling back to pure Python',
+ str(ex))
+
+
+def connect_numpy():
+ global numpy_conv
+ try:
+ import clickhouse_connect.driverc.npconv as cnc
+
+ numpy_conv = cnc
+ logger.debug('Successfully import ClickHouse Connect C/Numpy optimizations')
+ except ImportError as ex:
+ logger.debug('Unable to connect ClickHouse Connect C to Numpy API [%s], falling back to pure Python',
+ str(ex))
+
+
+connect_c_modules()
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/dataconv.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/dataconv.py
new file mode 100644
index 0000000000..29c96a9a66
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/dataconv.py
@@ -0,0 +1,129 @@
+import array
+from datetime import datetime, date, tzinfo
+from ipaddress import IPv4Address
+from typing import Sequence, Optional, Any
+from uuid import UUID, SafeUUID
+
+from clickhouse_connect.driver.common import int_size
+from clickhouse_connect.driver.types import ByteSource
+from clickhouse_connect.driver.options import np
+
+
+MONTH_DAYS = (0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334, 365)
+MONTH_DAYS_LEAP = (0, 31, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335, 366)
+
+
+def read_ipv4_col(source: ByteSource, num_rows: int):
+ column = source.read_array('I', num_rows)
+ fast_ip_v4 = IPv4Address.__new__
+ new_col = []
+ app = new_col.append
+ for x in column:
+ ipv4 = fast_ip_v4(IPv4Address)
+ ipv4._ip = x # pylint: disable=protected-access
+ app(ipv4)
+ return new_col
+
+
+def read_datetime_col(source: ByteSource, num_rows: int, tz_info: Optional[tzinfo]):
+ src_array = source.read_array('I', num_rows)
+ if tz_info is None:
+ fts = datetime.utcfromtimestamp
+ return [fts(ts) for ts in src_array]
+ fts = datetime.fromtimestamp
+ return [fts(ts, tz_info) for ts in src_array]
+
+
+def epoch_days_to_date(days: int) -> date:
+ cycles400, rem = divmod(days + 134774, 146097)
+ cycles100, rem = divmod(rem, 36524)
+ cycles, rem = divmod(rem, 1461)
+ years, rem = divmod(rem, 365)
+ year = (cycles << 2) + cycles400 * 400 + cycles100 * 100 + years + 1601
+ if years == 4 or cycles100 == 4:
+ return date(year - 1, 12, 31)
+ m_list = MONTH_DAYS_LEAP if years == 3 and (year == 2000 or year % 100 != 0) else MONTH_DAYS
+ month = (rem + 24) >> 5
+ while rem < m_list[month]:
+ month -= 1
+ return date(year, month + 1, rem + 1 - m_list[month])
+
+
+def read_date_col(source: ByteSource, num_rows: int):
+ column = source.read_array('H', num_rows)
+ return [epoch_days_to_date(x) for x in column]
+
+
+def read_date32_col(source: ByteSource, num_rows: int):
+ column = source.read_array('l' if int_size == 2 else 'i', num_rows)
+ return [epoch_days_to_date(x) for x in column]
+
+
+def read_uuid_col(source: ByteSource, num_rows: int):
+ v = source.read_array('Q', num_rows * 2)
+ empty_uuid = UUID(int=0)
+ new_uuid = UUID.__new__
+ unsafe = SafeUUID.unsafe
+ oset = object.__setattr__
+ column = []
+ app = column.append
+ for i in range(num_rows):
+ ix = i << 1
+ int_value = v[ix] << 64 | v[ix + 1]
+ if int_value == 0:
+ app(empty_uuid)
+ else:
+ fast_uuid = new_uuid(UUID)
+ oset(fast_uuid, 'int', int_value)
+ oset(fast_uuid, 'is_safe', unsafe)
+ app(fast_uuid)
+ return column
+
+
+def read_nullable_array(source: ByteSource, array_type: str, num_rows: int, null_obj: Any):
+ null_map = source.read_bytes(num_rows)
+ column = source.read_array(array_type, num_rows)
+ return [null_obj if null_map[ix] else column[ix] for ix in range(num_rows)]
+
+
+def build_nullable_column(source: Sequence, null_map: bytes, null_obj: Any):
+ return [source[ix] if null_map[ix] == 0 else null_obj for ix in range(len(source))]
+
+
+def build_lc_nullable_column(index: Sequence, keys: array.array, null_obj: Any):
+ column = []
+ for key in keys:
+ if key == 0:
+ column.append(null_obj)
+ else:
+ column.append(index[key])
+ return column
+
+
+def to_numpy_array(column: Sequence):
+ arr = np.empty((len(column),), dtype=np.object)
+ arr[:] = column
+ return arr
+
+
+def pivot(data: Sequence[Sequence], start_row: int, end_row: int) -> Sequence[Sequence]:
+ return tuple(zip(*data[start_row: end_row]))
+
+
+def write_str_col(column: Sequence, encoding: Optional[str], dest: bytearray):
+ app = dest.append
+ for x in column:
+ if not x:
+ app(0)
+ else:
+ if encoding:
+ x = x.encode(encoding)
+ sz = len(x)
+ while True:
+ b = sz & 0x7f
+ sz >>= 7
+ if sz == 0:
+ app(b)
+ break
+ app(0x80 | b)
+ dest += x
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/ddl.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ddl.py
new file mode 100644
index 0000000000..a9a1a4b0aa
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ddl.py
@@ -0,0 +1,28 @@
+from typing import NamedTuple, Sequence
+
+from clickhouse_connect.datatypes.base import ClickHouseType
+
+
+class TableColumnDef(NamedTuple):
+ """
+ Simplified ClickHouse Table Column definition for DDL
+ """
+ name: str
+ ch_type: ClickHouseType
+ expr_type: str = None
+ expr: str = None
+
+ @property
+ def col_expr(self):
+ expr = f'{self.name} {self.ch_type.name}'
+ if self.expr_type:
+ expr += f' {self.expr_type} {self.expr}'
+ return expr
+
+
+def create_table(table_name: str, columns: Sequence[TableColumnDef], engine: str, engine_params: dict):
+ stmt = f"CREATE TABLE {table_name} ({', '.join(col.col_expr for col in columns)}) ENGINE {engine} "
+ if engine_params:
+ for key, value in engine_params.items():
+ stmt += f' {key} {value}'
+ return stmt
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/exceptions.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/exceptions.py
new file mode 100644
index 0000000000..1cd41f9933
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/exceptions.py
@@ -0,0 +1,84 @@
+"""
+The driver exception classes here include all named exceptions required by th DB API 2.0 specification. It's not clear
+how useful that naming convention is, but the convention is used for potential improved compatibility with other
+libraries. In most cases docstring are taken from the DBIApi 2.0 documentation
+"""
+
+
+class ClickHouseError(Exception):
+ """Exception related to operation with ClickHouse."""
+
+
+# pylint: disable=redefined-builtin
+class Warning(Warning, ClickHouseError):
+ """Exception raised for important warnings like data truncations
+ while inserting, etc."""
+
+
+class Error(ClickHouseError):
+ """Exception that is the base class of all other error exceptions
+ (not Warning)."""
+
+
+class InterfaceError(Error):
+ """Exception raised for errors that are related to the database
+ interface rather than the database itself."""
+
+
+class DatabaseError(Error):
+ """Exception raised for errors that are related to the
+ database."""
+
+
+class DataError(DatabaseError):
+ """Exception raised for errors that are due to problems with the
+ processed data like division by zero, numeric value out of range,
+ etc."""
+
+
+class OperationalError(DatabaseError):
+ """Exception raised for errors that are related to the database's
+ operation and not necessarily under the control of the programmer,
+ e.g. an unexpected disconnect occurs, the data source name is not
+ found, a transaction could not be processed, a memory allocation
+ error occurred during processing, etc."""
+
+
+class IntegrityError(DatabaseError):
+ """Exception raised when the relational integrity of the database
+ is affected, e.g. a foreign key check fails, duplicate key,
+ etc."""
+
+
+class InternalError(DatabaseError):
+ """Exception raised when the database encounters an internal
+ error, e.g. the cursor is not valid anymore, the transaction is
+ out of sync, etc."""
+
+
+class ProgrammingError(DatabaseError):
+ """Exception raised for programming errors, e.g. table not found
+ or already exists, syntax error in the SQL statement, wrong number
+ of parameters specified, etc."""
+
+
+class NotSupportedError(DatabaseError):
+ """Exception raised in case a method or database API was used
+ which is not supported by the database, e.g. requesting a
+ .rollback() on a connection that does not support transaction or
+ has transactions turned off."""
+
+
+class StreamClosedError(ProgrammingError):
+ """Exception raised when a stream operation is executed on a closed stream."""
+
+ def __init__(self):
+ super().__init__('Executing a streaming operation on a closed stream')
+
+
+class StreamCompleteException(Exception):
+ """ Internal exception used to indicate the end of a ClickHouse query result stream."""
+
+
+class StreamFailureError(Exception):
+ """ Stream failed unexpectedly """
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/external.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/external.py
new file mode 100644
index 0000000000..2d34f71ba8
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/external.py
@@ -0,0 +1,127 @@
+import logging
+from typing import Optional, Sequence, Dict, Union
+from pathlib import Path
+
+from clickhouse_connect.driver.exceptions import ProgrammingError
+
+logger = logging.getLogger(__name__)
+
+
+class ExternalFile:
+ # pylint: disable=too-many-branches
+ def __init__(self,
+ file_path: Optional[str] = None,
+ file_name: Optional[str] = None,
+ data: Optional[bytes] = None,
+ fmt: Optional[str] = None,
+ types: Optional[Union[str, Sequence[str]]] = None,
+ structure: Optional[Union[str, Sequence[str]]] = None,
+ mime_type: Optional[str] = None):
+ if file_path:
+ if data:
+ raise ProgrammingError('Only data or file_path should be specified for external data, not both')
+ try:
+ with open(file_path, 'rb') as file:
+ self.data = file.read()
+ except OSError as ex:
+ raise ProgrammingError(f'Failed to open file {file_path} for external data') from ex
+ path_name = Path(file_path).name
+ path_base = path_name.rsplit('.', maxsplit=1)[0]
+ if not file_name:
+ self.name = path_base
+ self.file_name = path_name
+ else:
+ self.name = file_name.rsplit('.', maxsplit=1)[0]
+ self.file_name = file_name
+ if file_name != path_name and path_base != self.name:
+ logger.warning('External data name %s and file_path %s use different names', file_name, path_name)
+ elif data:
+ if not file_name:
+ raise ProgrammingError('Name is required for query external data')
+ self.data = data
+ self.name = file_name.rsplit('.', maxsplit=1)[0]
+ self.file_name = file_name
+ else:
+ raise ProgrammingError('Either data or file_path must be specified for external data')
+ if types:
+ if structure:
+ raise ProgrammingError('Only types or structure should be specified for external data, not both')
+ self.structure = None
+ if isinstance(types, str):
+ self.types = types
+ else:
+ self.types = ','.join(types)
+ elif structure:
+ self.types = None
+ if isinstance(structure, str):
+ self.structure = structure
+ else:
+ self.structure = ','.join(structure)
+ self.fmt = fmt
+ self.mime_type = mime_type or 'application/octet-stream'
+
+ @property
+ def form_data(self) -> tuple:
+ return self.file_name, self.data, self.mime_type
+
+ @property
+ def query_params(self) -> Dict[str, str]:
+ params = {}
+ for name, value in (('format', self.fmt),
+ ('structure', self.structure),
+ ('types', self.types)):
+ if value:
+ params[f'{self.name}_{name}'] = value
+ return params
+
+
+class ExternalData:
+ def __init__(self,
+ file_path: Optional[str] = None,
+ file_name: Optional[str] = None,
+ data: Optional[bytes] = None,
+ fmt: Optional[str] = None,
+ types: Optional[Union[str, Sequence[str]]] = None,
+ structure: Optional[Union[str, Sequence[str]]] = None,
+ mime_type: Optional[str] = None):
+ self.files: list[ExternalFile] = []
+ if file_path or data:
+ first_file = ExternalFile(file_path=file_path,
+ file_name=file_name,
+ data=data,
+ fmt=fmt,
+ types=types,
+ structure=structure,
+ mime_type=mime_type)
+ self.files.append(first_file)
+
+ def add_file(self,
+ file_path: Optional[str] = None,
+ file_name: Optional[str] = None,
+ data: Optional[bytes] = None,
+ fmt: Optional[str] = None,
+ types: Optional[Union[str, Sequence[str]]] = None,
+ structure: Optional[Union[str, Sequence[str]]] = None,
+ mime_type: Optional[str] = None):
+ self.files.append(ExternalFile(file_path=file_path,
+ file_name=file_name,
+ data=data,
+ fmt=fmt,
+ types=types,
+ structure=structure,
+ mime_type=mime_type))
+
+ @property
+ def form_data(self) -> Dict[str, tuple]:
+ if not self.files:
+ raise ProgrammingError('No external files set for external data')
+ return {file.name: file.form_data for file in self.files}
+
+ @property
+ def query_params(self) -> Dict[str, str]:
+ if not self.files:
+ raise ProgrammingError('No external files set for external data')
+ params = {}
+ for file in self.files:
+ params.update(file.query_params)
+ return params
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py
new file mode 100644
index 0000000000..c8fa9e6116
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py
@@ -0,0 +1,473 @@
+import json
+import logging
+import re
+import uuid
+from base64 import b64encode
+from typing import Optional, Dict, Any, Sequence, Union, List, Callable, Generator, BinaryIO
+from urllib.parse import urlencode
+
+from urllib3 import Timeout
+from urllib3.exceptions import HTTPError
+from urllib3.poolmanager import PoolManager
+from urllib3.response import HTTPResponse
+
+from clickhouse_connect import common
+from clickhouse_connect.datatypes import registry
+from clickhouse_connect.datatypes.base import ClickHouseType
+from clickhouse_connect.driver.ctypes import RespBuffCls
+from clickhouse_connect.driver.client import Client
+from clickhouse_connect.driver.common import dict_copy, coerce_bool, coerce_int
+from clickhouse_connect.driver.compression import available_compression
+from clickhouse_connect.driver.exceptions import DatabaseError, OperationalError, ProgrammingError
+from clickhouse_connect.driver.external import ExternalData
+from clickhouse_connect.driver.httputil import ResponseSource, get_pool_manager, get_response_data, \
+ default_pool_manager, get_proxy_manager, all_managers, check_env_proxy, check_conn_reset
+from clickhouse_connect.driver.insert import InsertContext
+from clickhouse_connect.driver.summary import QuerySummary
+from clickhouse_connect.driver.query import QueryResult, QueryContext, quote_identifier, bind_query
+from clickhouse_connect.driver.transform import NativeTransform
+
+logger = logging.getLogger(__name__)
+columns_only_re = re.compile(r'LIMIT 0\s*$', re.IGNORECASE)
+
+
+# pylint: disable=too-many-instance-attributes
+class HttpClient(Client):
+ params = {}
+ valid_transport_settings = {'database', 'buffer_size', 'session_id',
+ 'compress', 'decompress', 'session_timeout',
+ 'session_check', 'query_id', 'quota_key',
+ 'wait_end_of_query', 'client_protocol_version'}
+ optional_transport_settings = {'send_progress_in_http_headers',
+ 'http_headers_progress_interval_ms',
+ 'enable_http_compression'}
+ _owns_pool_manager = False
+
+ # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements,unused-argument
+ def __init__(self,
+ interface: str,
+ host: str,
+ port: int,
+ username: str,
+ password: str,
+ database: str,
+ compress: Union[bool, str] = True,
+ query_limit: int = 0,
+ query_retries: int = 2,
+ connect_timeout: int = 10,
+ send_receive_timeout: int = 300,
+ client_name: Optional[str] = None,
+ verify: bool = True,
+ ca_cert: Optional[str] = None,
+ client_cert: Optional[str] = None,
+ client_cert_key: Optional[str] = None,
+ session_id: Optional[str] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ pool_mgr: Optional[PoolManager] = None,
+ http_proxy: Optional[str] = None,
+ https_proxy: Optional[str] = None,
+ server_host_name: Optional[str] = None,
+ apply_server_timezone: Optional[Union[str, bool]] = True):
+ """
+ Create an HTTP ClickHouse Connect client
+ See clickhouse_connect.get_client for parameters
+ """
+ self.url = f'{interface}://{host}:{port}'
+ self.headers = {}
+ ch_settings = settings or {}
+ self.http = pool_mgr
+ if interface == 'https':
+ if not https_proxy:
+ https_proxy = check_env_proxy('https', host, port)
+ if client_cert:
+ if not username:
+ raise ProgrammingError('username parameter is required for Mutual TLS authentication')
+ self.headers['X-ClickHouse-User'] = username
+ self.headers['X-ClickHouse-SSL-Certificate-Auth'] = 'on'
+ verify = coerce_bool(verify)
+ # pylint: disable=too-many-boolean-expressions
+ if not self.http and (server_host_name or ca_cert or client_cert or not verify or https_proxy):
+ options = {
+ 'ca_cert': ca_cert,
+ 'client_cert': client_cert,
+ 'verify': verify,
+ 'client_cert_key': client_cert_key
+ }
+ if server_host_name:
+ if verify:
+ options['assert_hostname'] = server_host_name
+ options['server_hostname'] = server_host_name
+ self.http = get_pool_manager(https_proxy=https_proxy, **options)
+ self._owns_pool_manager = True
+ if not self.http:
+ if not http_proxy:
+ http_proxy = check_env_proxy('http', host, port)
+ if http_proxy:
+ self.http = get_proxy_manager(host, http_proxy)
+ else:
+ self.http = default_pool_manager()
+
+ if not client_cert and username:
+ self.headers['Authorization'] = 'Basic ' + b64encode(f'{username}:{password}'.encode()).decode()
+ self.headers['User-Agent'] = common.build_client_name(client_name)
+ self._read_format = self._write_format = 'Native'
+ self._transform = NativeTransform()
+
+ connect_timeout, send_receive_timeout = coerce_int(connect_timeout), coerce_int(send_receive_timeout)
+ self.timeout = Timeout(connect=connect_timeout, read=send_receive_timeout)
+ self.http_retries = 1
+ self._send_progress = None
+ self._send_comp_setting = False
+ self._progress_interval = None
+ self._active_session = None
+
+ if session_id:
+ ch_settings['session_id'] = session_id
+ elif 'session_id' not in ch_settings and common.get_setting('autogenerate_session_id'):
+ ch_settings['session_id'] = str(uuid.uuid4())
+
+ if coerce_bool(compress):
+ compression = ','.join(available_compression)
+ self.write_compression = available_compression[0]
+ elif compress and compress not in ('False', 'false', '0'):
+ if compress not in available_compression:
+ raise ProgrammingError(f'Unsupported compression method {compress}')
+ compression = compress
+ self.write_compression = compress
+ else:
+ compression = None
+
+ super().__init__(database=database,
+ uri=self.url,
+ query_limit=query_limit,
+ query_retries=query_retries,
+ server_host_name=server_host_name,
+ apply_server_timezone=apply_server_timezone)
+ self.params = self._validate_settings(ch_settings)
+ comp_setting = self._setting_status('enable_http_compression')
+ self._send_comp_setting = not comp_setting.is_set and comp_setting.is_writable
+ if comp_setting.is_set or comp_setting.is_writable:
+ self.compression = compression
+ send_setting = self._setting_status('send_progress_in_http_headers')
+ self._send_progress = not send_setting.is_set and send_setting.is_writable
+ if (send_setting.is_set or send_setting.is_writable) and \
+ self._setting_status('http_headers_progress_interval_ms').is_writable:
+ self._progress_interval = str(min(120000, max(10000, (send_receive_timeout - 5) * 1000)))
+
+ def set_client_setting(self, key, value):
+ str_value = self._validate_setting(key, value, common.get_setting('invalid_setting_action'))
+ if str_value is not None:
+ self.params[key] = str_value
+
+ def get_client_setting(self, key) -> Optional[str]:
+ values = self.params.get(key)
+ return values[0] if values else None
+
+ def _prep_query(self, context: QueryContext):
+ final_query = super()._prep_query(context)
+ if context.is_insert:
+ return final_query
+ return f'{final_query}\n FORMAT {self._write_format}'
+
+ def _query_with_context(self, context: QueryContext) -> QueryResult:
+ headers = {}
+ params = {}
+ if self.database:
+ params['database'] = self.database
+ if self.protocol_version:
+ params['client_protocol_version'] = self.protocol_version
+ context.block_info = True
+ params.update(context.bind_params)
+ params.update(self._validate_settings(context.settings))
+ if columns_only_re.search(context.uncommented_query):
+ response = self._raw_request(f'{context.final_query}\n FORMAT JSON',
+ params, headers, retries=self.query_retries)
+ json_result = json.loads(response.data)
+ # ClickHouse will respond with a JSON object of meta, data, and some other objects
+ # We just grab the column names and column types from the metadata sub object
+ names: List[str] = []
+ types: List[ClickHouseType] = []
+ for col in json_result['meta']:
+ names.append(col['name'])
+ types.append(registry.get_from_name(col['type']))
+ return QueryResult([], None, tuple(names), tuple(types))
+
+ if self.compression:
+ headers['Accept-Encoding'] = self.compression
+ if self._send_comp_setting:
+ params['enable_http_compression'] = '1'
+ final_query = self._prep_query(context)
+ if context.external_data:
+ body = bytes()
+ params['query'] = final_query
+ params.update(context.external_data.query_params)
+ fields = context.external_data.form_data
+ else:
+ body = final_query
+ fields = None
+ headers['Content-Type'] = 'text/plain; charset=utf-8'
+ response = self._raw_request(body,
+ params,
+ headers,
+ stream=True,
+ retries=self.query_retries,
+ fields=fields,
+ server_wait=not context.streaming)
+ byte_source = RespBuffCls(ResponseSource(response)) # pylint: disable=not-callable
+ context.set_response_tz(self._check_tz_change(response.headers.get('X-ClickHouse-Timezone')))
+ query_result = self._transform.parse_response(byte_source, context)
+ query_result.summary = self._summary(response)
+ return query_result
+
+ def data_insert(self, context: InsertContext) -> QuerySummary:
+ """
+ See BaseClient doc_string for this method
+ """
+ if context.empty:
+ logger.debug('No data included in insert, skipping')
+ return QuerySummary()
+
+ def error_handler(resp: HTTPResponse):
+ # If we actually had a local exception when building the insert, throw that instead
+ if context.insert_exception:
+ ex = context.insert_exception
+ context.insert_exception = None
+ raise ex
+ self._error_handler(resp)
+
+ headers = {'Content-Type': 'application/octet-stream'}
+ if context.compression is None:
+ context.compression = self.write_compression
+ if context.compression:
+ headers['Content-Encoding'] = context.compression
+ block_gen = self._transform.build_insert(context)
+
+ params = {}
+ if self.database:
+ params['database'] = self.database
+ params.update(self._validate_settings(context.settings))
+
+ response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False)
+ logger.debug('Context insert response code: %d, content: %s', response.status, response.data)
+ context.data = None
+ return QuerySummary(self._summary(response))
+
+ def raw_insert(self, table: str = None,
+ column_names: Optional[Sequence[str]] = None,
+ insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None,
+ settings: Optional[Dict] = None,
+ fmt: Optional[str] = None,
+ compression: Optional[str] = None) -> QuerySummary:
+ """
+ See BaseClient doc_string for this method
+ """
+ params = {}
+ headers = {'Content-Type': 'application/octet-stream'}
+ if compression:
+ headers['Content-Encoding'] = compression
+ if table:
+ cols = f" ({', '.join([quote_identifier(x) for x in column_names])})" if column_names is not None else ''
+ query = f'INSERT INTO {table}{cols} FORMAT {fmt if fmt else self._write_format}'
+ if not compression and isinstance(insert_block, str):
+ insert_block = query + '\n' + insert_block
+ elif not compression and isinstance(insert_block, (bytes, bytearray, BinaryIO)):
+ insert_block = (query + '\n').encode() + insert_block
+ else:
+ params['query'] = query
+ if self.database:
+ params['database'] = self.database
+ params.update(self._validate_settings(settings or {}))
+ response = self._raw_request(insert_block, params, headers, server_wait=False)
+ logger.debug('Raw insert response code: %d, content: %s', response.status, response.data)
+ return QuerySummary(self._summary(response))
+
+ @staticmethod
+ def _summary(response: HTTPResponse):
+ summary = {}
+ if 'X-ClickHouse-Summary' in response.headers:
+ try:
+ summary = json.loads(response.headers['X-ClickHouse-Summary'])
+ except json.JSONDecodeError:
+ pass
+ summary['query_id'] = response.headers.get('X-ClickHouse-Query-Id', '')
+ return summary
+
+ def command(self,
+ cmd,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ data: Union[str, bytes] = None,
+ settings: Optional[Dict] = None,
+ use_database: int = True,
+ external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]:
+ """
+ See BaseClient doc_string for this method
+ """
+ cmd, params = bind_query(cmd, parameters, self.server_tz)
+ headers = {}
+ payload = None
+ fields = None
+ if external_data:
+ if data:
+ raise ProgrammingError('Cannot combine command data with external data') from None
+ fields = external_data.form_data
+ params.update(external_data.query_params)
+ elif isinstance(data, str):
+ headers['Content-Type'] = 'text/plain; charset=utf-8'
+ payload = data.encode()
+ elif isinstance(data, bytes):
+ headers['Content-Type'] = 'application/octet-stream'
+ payload = data
+ if payload is None and not cmd:
+ raise ProgrammingError('Command sent without query or recognized data') from None
+ if payload or fields:
+ params['query'] = cmd
+ else:
+ payload = cmd
+ if use_database and self.database:
+ params['database'] = self.database
+ params.update(self._validate_settings(settings or {}))
+
+ method = 'POST' if payload or fields else 'GET'
+ response = self._raw_request(payload, params, headers, method, fields=fields)
+ if response.data:
+ try:
+ result = response.data.decode()[:-1].split('\t')
+ if len(result) == 1:
+ try:
+ return int(result[0])
+ except ValueError:
+ return result[0]
+ return result
+ except UnicodeDecodeError:
+ return str(response.data)
+ return QuerySummary(self._summary(response))
+
+ def _error_handler(self, response: HTTPResponse, retried: bool = False) -> None:
+ err_str = f'HTTPDriver for {self.url} returned response code {response.status})'
+ try:
+ err_content = get_response_data(response)
+ except Exception: # pylint: disable=broad-except
+ err_content = None
+ finally:
+ response.close()
+
+ if err_content:
+ err_msg = common.format_error(err_content.decode(errors='backslashreplace'))
+ err_str = f':{err_str}\n {err_msg}'
+ raise OperationalError(err_str) if retried else DatabaseError(err_str) from None
+
+ def _raw_request(self,
+ data,
+ params: Dict[str, str],
+ headers: Optional[Dict[str, Any]] = None,
+ method: str = 'POST',
+ retries: int = 0,
+ stream: bool = False,
+ server_wait: bool = True,
+ fields: Optional[Dict[str, tuple]] = None,
+ error_handler: Callable = None) -> HTTPResponse:
+ if isinstance(data, str):
+ data = data.encode()
+ headers = dict_copy(self.headers, headers)
+ attempts = 0
+ if server_wait:
+ params['wait_end_of_query'] = '1'
+ # We can't actually read the progress headers, but we enable them so ClickHouse sends something
+ # to keep the connection alive when waiting for long-running queries and (2) to get summary information
+ # if not streaming
+ if self._send_progress:
+ params['send_progress_in_http_headers'] = '1'
+ if self._progress_interval:
+ params['http_headers_progress_interval_ms'] = self._progress_interval
+ final_params = dict_copy(self.params, params)
+ url = f'{self.url}?{urlencode(final_params)}'
+ kwargs = {
+ 'headers': headers,
+ 'timeout': self.timeout,
+ 'retries': self.http_retries,
+ 'preload_content': not stream
+ }
+ if self.server_host_name:
+ kwargs['assert_same_host'] = False
+ kwargs['headers'].update({'Host': self.server_host_name})
+ if fields:
+ kwargs['fields'] = fields
+ else:
+ kwargs['body'] = data
+ check_conn_reset(self.http)
+ query_session = final_params.get('session_id')
+ while True:
+ attempts += 1
+ if query_session:
+ if query_session == self._active_session:
+ raise ProgrammingError('Attempt to execute concurrent queries within the same session.' +
+ 'Please use a separate client instance per thread/process.')
+ # There is a race condition here when using multiprocessing -- in that case the server will
+ # throw an error instead, but in most cases this more helpful error will be thrown first
+ self._active_session = query_session
+ try:
+ response = self.http.request(method, url, **kwargs)
+ except HTTPError as ex:
+ if isinstance(ex.__context__, ConnectionResetError):
+ # The server closed the connection, probably because the Keep Alive has expired
+ # We should be safe to retry, as ClickHouse should not have processed anything on a connection
+ # that it killed. We also only retry this once, as multiple disconnects are unlikely to be
+ # related to the Keep Alive settings
+ if attempts == 1:
+ logger.debug('Retrying remotely closed connection')
+ continue
+ logger.warning('Unexpected Http Driver Exception')
+ raise OperationalError(f'Error {ex} executing HTTP request attempt {attempts} {self.url}') from ex
+ finally:
+ if query_session:
+ self._active_session = None # Make sure we always clear this
+ if 200 <= response.status < 300:
+ return response
+ if response.status in (429, 503, 504):
+ if attempts > retries:
+ self._error_handler(response, True)
+ logger.debug('Retrying requests with status code %d', response.status)
+ elif error_handler:
+ error_handler(response)
+ else:
+ self._error_handler(response)
+
+ def ping(self):
+ """
+ See BaseClient doc_string for this method
+ """
+ try:
+ response = self.http.request('GET', f'{self.url}/ping', timeout=3)
+ return 200 <= response.status < 300
+ except HTTPError:
+ logger.debug('ping failed', exc_info=True)
+ return False
+
+ def raw_query(self, query: str,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None, fmt: str = None,
+ use_database: bool = True, external_data: Optional[ExternalData] = None) -> bytes:
+ """
+ See BaseClient doc_string for this method
+ """
+ final_query, bind_params = bind_query(query, parameters, self.server_tz)
+ if fmt:
+ final_query += f'\n FORMAT {fmt}'
+ params = self._validate_settings(settings or {})
+ if use_database and self.database:
+ params['database'] = self.database
+ params.update(bind_params)
+ if external_data:
+ body = bytes()
+ params['query'] = final_query
+ params.update(external_data.query_params)
+ fields = external_data.form_data
+ else:
+ body = final_query
+ fields = None
+ return self._raw_request(body, params, fields=fields).data
+
+ def close(self):
+ if self._owns_pool_manager:
+ self.http.clear()
+ all_managers.pop(self.http, None)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py
new file mode 100644
index 0000000000..9bb8e26508
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py
@@ -0,0 +1,226 @@
+import atexit
+import http
+import logging
+import multiprocessing
+import os
+import sys
+import socket
+import time
+from typing import Dict, Any, Optional
+
+import certifi
+import lz4.frame
+import urllib3
+import zstandard
+from urllib3.poolmanager import PoolManager, ProxyManager
+from urllib3.response import HTTPResponse
+
+from clickhouse_connect.driver.exceptions import ProgrammingError
+from clickhouse_connect import common
+
+logger = logging.getLogger(__name__)
+
+# We disable this warning. Verify must explicitly set to false, so we assume the user knows what they're doing
+urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
+
+# Increase this number just to be safe when ClickHouse is returning progress headers
+http.client._MAXHEADERS = 10000 # pylint: disable=protected-access
+
+DEFAULT_KEEP_INTERVAL = 30
+DEFAULT_KEEP_COUNT = 3
+DEFAULT_KEEP_IDLE = 30
+
+SOCKET_TCP = socket.IPPROTO_TCP
+
+core_socket_options = [
+ (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
+ (SOCKET_TCP, socket.TCP_NODELAY, 1),
+ (socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 256),
+ (socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 256)
+]
+
+logging.getLogger('urllib3').setLevel(logging.WARNING)
+_proxy_managers = {}
+all_managers = {}
+
+
+@atexit.register
+def close_managers():
+ for manager in all_managers:
+ manager.clear()
+
+
+# pylint: disable=no-member,too-many-arguments,too-many-branches
+def get_pool_manager_options(keep_interval: int = DEFAULT_KEEP_INTERVAL,
+ keep_count: int = DEFAULT_KEEP_COUNT,
+ keep_idle: int = DEFAULT_KEEP_IDLE,
+ ca_cert: str = None,
+ verify: bool = True,
+ client_cert: str = None,
+ client_cert_key: str = None,
+ **options) -> Dict[str, Any]:
+ socket_options = core_socket_options.copy()
+ if getattr(socket, 'TCP_KEEPINTVL', None) is not None:
+ socket_options.append((SOCKET_TCP, socket.TCP_KEEPINTVL, keep_interval))
+ if getattr(socket, 'TCP_KEEPCNT', None) is not None:
+ socket_options.append((SOCKET_TCP, socket.TCP_KEEPCNT, keep_count))
+ if getattr(socket, 'TCP_KEEPIDLE', None) is not None:
+ socket_options.append((SOCKET_TCP, socket.TCP_KEEPIDLE, keep_idle))
+ if sys.platform == 'darwin':
+ socket_options.append((SOCKET_TCP, getattr(socket, 'TCP_KEEPALIVE', 0x10), keep_interval))
+ options['maxsize'] = options.get('maxsize', 8)
+ options['retries'] = options.get('retries', 1)
+ if ca_cert == 'certifi':
+ ca_cert = certifi.where()
+ options['cert_reqs'] = 'CERT_REQUIRED' if verify else 'CERT_NONE'
+ if ca_cert:
+ options['ca_certs'] = ca_cert
+ if client_cert:
+ options['cert_file'] = client_cert
+ if client_cert_key:
+ options['key_file'] = client_cert_key
+ options['socket_options'] = socket_options
+ options['block'] = options.get('block', False)
+ return options
+
+
+def get_pool_manager(keep_interval: int = DEFAULT_KEEP_INTERVAL,
+ keep_count: int = DEFAULT_KEEP_COUNT,
+ keep_idle: int = DEFAULT_KEEP_IDLE,
+ ca_cert: str = None,
+ verify: bool = True,
+ client_cert: str = None,
+ client_cert_key: str = None,
+ http_proxy: str = None,
+ https_proxy: str = None,
+ **options):
+ options = get_pool_manager_options(keep_interval,
+ keep_count,
+ keep_idle,
+ ca_cert,
+ verify,
+ client_cert,
+ client_cert_key,
+ **options)
+ if http_proxy:
+ if https_proxy:
+ raise ProgrammingError('Only one of http_proxy or https_proxy should be specified')
+ if not http_proxy.startswith('http'):
+ http_proxy = f'http://{http_proxy}'
+ manager = ProxyManager(http_proxy, **options)
+ elif https_proxy:
+ if not https_proxy.startswith('http'):
+ https_proxy = f'https://{https_proxy}'
+ manager = ProxyManager(https_proxy, **options)
+ else:
+ manager = PoolManager(**options)
+ all_managers[manager] = int(time.time())
+ return manager
+
+
+def check_conn_reset(manager: PoolManager):
+ reset_seconds = common.get_setting('max_connection_age')
+ if reset_seconds:
+ last_reset = all_managers.get(manager, 0)
+ now = int(time.time())
+ if last_reset < now - reset_seconds:
+ logger.debug('connection reset')
+ manager.clear()
+ all_managers[manager] = now
+
+
+def get_proxy_manager(host: str, http_proxy):
+ key = f'{host}__{http_proxy}'
+ if key in _proxy_managers:
+ return _proxy_managers[key]
+ proxy_manager = get_pool_manager(http_proxy=http_proxy)
+ _proxy_managers[key] = proxy_manager
+ return proxy_manager
+
+
+def get_response_data(response: HTTPResponse) -> bytes:
+ encoding = response.headers.get('content-encoding', None)
+ if encoding == 'zstd':
+ try:
+ zstd_decom = zstandard.ZstdDecompressor()
+ return zstd_decom.stream_reader(response.data).read()
+ except zstandard.ZstdError:
+ pass
+ if encoding == 'lz4':
+ lz4_decom = lz4.frame.LZ4FrameDecompressor()
+ return lz4_decom.decompress(response.data, len(response.data))
+ return response.data
+
+
+def check_env_proxy(scheme: str, host: str, port: int) -> Optional[str]:
+ env_var = f'{scheme}_proxy'.lower()
+ proxy = os.environ.get(env_var)
+ if not proxy:
+ proxy = os.environ.get(env_var.upper())
+ if not proxy:
+ return None
+ no_proxy = os.environ.get('no_proxy')
+ if not no_proxy:
+ no_proxy = os.environ.get('NO_PROXY')
+ if not no_proxy:
+ return proxy
+ if no_proxy == '*':
+ return None # Wildcard no proxy means don't actually proxy anything
+ host = host.lower()
+ for name in no_proxy.split(','):
+ name = name.strip()
+ if name:
+ name = name.lstrip('.').lower()
+ if name in (host, f'{host}:{port}'):
+ return None # Host or host/port matches
+ if host.endswith('.' + name):
+ return None # Domain matches
+ return proxy
+
+
+_default_pool_manager = get_pool_manager()
+
+
+def default_pool_manager():
+ if multiprocessing.current_process().name == 'MainProcess':
+ return _default_pool_manager
+ # PoolManagers don't seem to be safe for some multiprocessing environments, always return a new one
+ return get_pool_manager()
+
+
+class ResponseSource:
+ def __init__(self, response: HTTPResponse, chunk_size: int = 1024 * 1024):
+ self.response = response
+ compression = response.headers.get('content-encoding')
+ if compression == 'zstd':
+ zstd_decom = zstandard.ZstdDecompressor().decompressobj()
+
+ def decompress():
+ while True:
+ chunk = response.read(chunk_size, decode_content=False)
+ if not chunk:
+ break
+ yield zstd_decom.decompress(chunk)
+
+ self.gen = decompress()
+ elif compression == 'lz4':
+ lz4_decom = lz4.frame.LZ4FrameDecompressor()
+
+ def decompress():
+ while lz4_decom.needs_input:
+ data = self.response.read(chunk_size, decode_content=False)
+ if lz4_decom.unused_data:
+ data = lz4_decom.unused_data + data
+ if not data:
+ return
+ chunk = lz4_decom.decompress(data)
+ if chunk:
+ yield chunk
+
+ self.gen = decompress()
+ else:
+ self.gen = response.stream(amt=chunk_size, decode_content=True)
+
+ def close(self):
+ self.response.drain_conn()
+ self.response.close()
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py
new file mode 100644
index 0000000000..c7861b3077
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py
@@ -0,0 +1,199 @@
+import logging
+from math import log
+from typing import Iterable, Sequence, Optional, Any, Dict, NamedTuple, Generator, Union, TYPE_CHECKING
+
+from clickhouse_connect.driver.query import quote_identifier
+
+from clickhouse_connect.driver.ctypes import data_conv
+from clickhouse_connect.driver.context import BaseQueryContext
+from clickhouse_connect.driver.options import np, pd, pd_time_test
+from clickhouse_connect.driver.exceptions import ProgrammingError
+
+if TYPE_CHECKING:
+ from clickhouse_connect.datatypes.base import ClickHouseType
+
+logger = logging.getLogger(__name__)
+DEFAULT_BLOCK_BYTES = 1 << 21 # Try to generate blocks between 1MB and 2MB in raw size
+
+
+class InsertBlock(NamedTuple):
+ prefix: bytes
+ column_count: int
+ row_count: int
+ column_names: Iterable[str]
+ column_types: Iterable['ClickHouseType']
+ column_data: Iterable[Sequence[Any]]
+
+
+# pylint: disable=too-many-instance-attributes
+class InsertContext(BaseQueryContext):
+ """
+ Reusable Argument/parameter object for inserts.
+ """
+
+ # pylint: disable=too-many-arguments
+ def __init__(self,
+ table: str,
+ column_names: Sequence[str],
+ column_types: Sequence['ClickHouseType'],
+ data: Any = None,
+ column_oriented: Optional[bool] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ compression: Optional[Union[str, bool]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ block_size: Optional[int] = None):
+ super().__init__(settings, query_formats, column_formats)
+ self.table = table
+ self.column_names = column_names
+ self.column_types = column_types
+ self.column_oriented = False if column_oriented is None else column_oriented
+ self.compression = compression
+ self.req_block_size = block_size
+ self.block_row_count = DEFAULT_BLOCK_BYTES
+ self.data = data
+ self.insert_exception = None
+
+ @property
+ def empty(self) -> bool:
+ return self._data is None
+
+ @property
+ def data(self):
+ return self._raw_data
+
+ @data.setter
+ def data(self, data: Any):
+ self._raw_data = data
+ self.current_block = 0
+ self.current_row = 0
+ self.row_count = 0
+ self.column_count = 0
+ self._data = None
+ if data is None or len(data) == 0:
+ return
+ if pd and isinstance(data, pd.DataFrame):
+ data = self._convert_pandas(data)
+ self.column_oriented = True
+ if np and isinstance(data, np.ndarray):
+ data = self._convert_numpy(data)
+ if self.column_oriented:
+ self._next_block_data = self._column_block_data
+ self._block_columns = data # [SliceView(column) for column in data]
+ self._block_rows = None
+ self.column_count = len(data)
+ self.row_count = len(data[0])
+ else:
+ self._next_block_data = self._row_block_data
+ self._block_rows = data
+ self._block_columns = None
+ self.row_count = len(data)
+ self.column_count = len(data[0])
+ if self.row_count and self.column_count:
+ if self.column_count != len(self.column_names):
+ raise ProgrammingError('Insert data column count does not match column names')
+ self._data = data
+ self.block_row_count = self._calc_block_size()
+
+ def _calc_block_size(self) -> int:
+ if self.req_block_size:
+ return self.req_block_size
+ row_size = 0
+ sample_size = min((log(self.row_count) + 1) * 2, 64)
+ sample_freq = max(1, int(self.row_count / sample_size))
+ for i, d_type in enumerate(self.column_types):
+ if d_type.byte_size:
+ row_size += d_type.byte_size
+ continue
+ if self.column_oriented:
+ col_data = self._data[i]
+ if sample_freq == 1:
+ d_size = d_type.data_size(col_data)
+ else:
+ sample = [col_data[j] for j in range(0, self.row_count, sample_freq)]
+ d_size = d_type.data_size(sample)
+ else:
+ data = self._data
+ sample = [data[j][i] for j in range(0, self.row_count, sample_freq)]
+ d_size = d_type.data_size(sample)
+ row_size += d_size
+ return 1 << (21 - int(log(row_size, 2)))
+
+ def next_block(self) -> Generator[InsertBlock, None, None]:
+ while True:
+ block_end = min(self.current_row + self.block_row_count, self.row_count)
+ row_count = block_end - self.current_row
+ if row_count <= 0:
+ return
+ if self.current_block == 0:
+ cols = f" ({', '.join([quote_identifier(x) for x in self.column_names])})"
+ prefix = f'INSERT INTO {self.table}{cols} FORMAT Native\n'.encode()
+ else:
+ prefix = bytes()
+ self.current_block += 1
+ data = self._next_block_data(self.current_row, block_end)
+ yield InsertBlock(prefix, self.column_count, row_count, self.column_names, self.column_types, data)
+ self.current_row = block_end
+
+ def _column_block_data(self, block_start, block_end):
+ if block_start == 0 and self.row_count <= block_end:
+ return self._block_columns # Optimization if we don't need to break up the block
+ return [col[block_start: block_end] for col in self._block_columns]
+
+ def _row_block_data(self, block_start, block_end):
+ return data_conv.pivot(self._block_rows, block_start, block_end)
+
+ def _convert_pandas(self, df):
+ data = []
+ for df_col_name, col_name, ch_type in zip(df.columns, self.column_names, self.column_types):
+ df_col = df[df_col_name]
+ d_type = str(df_col.dtype)
+ if ch_type.python_type == int:
+ if 'float' in d_type:
+ df_col = df_col.round().astype(ch_type.base_type, copy=False)
+ else:
+ df_col = df_col.astype(ch_type.base_type, copy=False)
+ elif 'datetime' in ch_type.np_type and (pd_time_test(df_col) or 'datetime64[ns' in d_type):
+ div = ch_type.nano_divisor
+ data.append([None if pd.isnull(x) else x.value // div for x in df_col])
+ self.column_formats[col_name] = 'int'
+ continue
+ if ch_type.nullable:
+ if d_type == 'object':
+ # This is ugly, but the multiple replaces seem required as a result of this bug:
+ # https://github.com/pandas-dev/pandas/issues/29024
+ df_col = df_col.replace({pd.NaT: None}).replace({np.nan: None})
+ elif 'Float' in ch_type.base_type:
+ # This seems to be the only way to convert any null looking things to nan
+ df_col = df_col.astype(ch_type.np_type)
+ else:
+ df_col = df_col.replace({np.nan: None})
+ data.append(df_col.to_numpy(copy=False))
+ return data
+
+ def _convert_numpy(self, np_array):
+ if np_array.dtype.names is None:
+ if 'date' in str(np_array.dtype):
+ for col_name, col_type in zip(self.column_names, self.column_types):
+ if 'date' in col_type.np_type:
+ self.column_formats[col_name] = 'int'
+ return np_array.astype('int').tolist()
+ for col_type in self.column_types:
+ if col_type.byte_size == 0 or col_type.byte_size > np_array.dtype.itemsize:
+ return np_array.tolist()
+ return np_array
+
+ if set(self.column_names).issubset(set(np_array.dtype.names)):
+ data = [np_array[col_name] for col_name in self.column_names]
+ else:
+ # Column names don't match, so we have to assume they are in order
+ data = [np_array[col_name] for col_name in np_array.dtype.names]
+ for ix, (col_name, col_type) in enumerate(zip(self.column_names, self.column_types)):
+ d_type = data[ix].dtype
+ if 'date' in str(d_type) and 'date' in col_type.np_type:
+ self.column_formats[col_name] = 'int'
+ data[ix] = data[ix].astype(int).tolist()
+ elif col_type.byte_size == 0 or col_type.byte_size > d_type.itemsize:
+ data[ix] = data[ix].tolist()
+ self.column_oriented = True
+ return data
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/models.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/models.py
new file mode 100644
index 0000000000..054c7b686a
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/models.py
@@ -0,0 +1,37 @@
+from typing import NamedTuple
+
+from clickhouse_connect.datatypes.registry import get_from_name
+
+
+class ColumnDef(NamedTuple):
+ """
+ ClickHouse column definition from DESCRIBE TABLE command
+ """
+ name: str
+ type: str
+ default_type: str
+ default_expression: str
+ comment: str
+ codec_expression: str
+ ttl_expression: str
+
+ @property
+ def ch_type(self):
+ return get_from_name(self.type)
+
+
+class SettingDef(NamedTuple):
+ """
+ ClickHouse setting definition from system.settings table
+ """
+ name: str
+ value: str
+ readonly: int
+
+
+class SettingStatus(NamedTuple):
+ """
+ Get the setting "status" from a ClickHouse server setting
+ """
+ is_set: bool
+ is_writable: bool
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/npconv.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/npconv.py
new file mode 100644
index 0000000000..df99550d34
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/npconv.py
@@ -0,0 +1,9 @@
+from clickhouse_connect.driver.options import np
+
+from clickhouse_connect.driver.types import ByteSource
+
+
+def read_numpy_array(source: ByteSource, np_type: str, num_rows: int):
+ dtype = np.dtype(np_type)
+ buffer = source.read_bytes(dtype.itemsize * num_rows)
+ return np.frombuffer(buffer, dtype, num_rows)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/npquery.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/npquery.py
new file mode 100644
index 0000000000..1c063e4082
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/npquery.py
@@ -0,0 +1,132 @@
+import logging
+from typing import Generator, Sequence, Tuple
+
+from clickhouse_connect.driver.common import empty_gen, StreamContext
+from clickhouse_connect.driver.exceptions import StreamClosedError
+from clickhouse_connect.driver.types import Closable
+from clickhouse_connect.driver.options import np, pd
+
+logger = logging.getLogger(__name__)
+
+
+# pylint: disable=too-many-instance-attributes
+class NumpyResult(Closable):
+ def __init__(self,
+ block_gen: Generator[Sequence, None, None] = None,
+ column_names: Tuple = (),
+ column_types: Tuple = (),
+ d_types: Sequence = (),
+ source: Closable = None):
+ self.column_names = column_names
+ self.column_types = column_types
+ self.np_types = d_types
+ self.source = source
+ self.query_id = ''
+ self.summary = {}
+ self._block_gen = block_gen or empty_gen()
+ self._numpy_result = None
+ self._df_result = None
+
+ def _np_stream(self) -> Generator:
+ if self._block_gen is None:
+ raise StreamClosedError
+
+ block_gen = self._block_gen
+ self._block_gen = None
+ if not self.np_types:
+ return block_gen
+
+ d_types = self.np_types
+ first_type = d_types[0]
+ if first_type != np.object_ and all(np.dtype(np_type) == first_type for np_type in d_types):
+ self.np_types = first_type
+
+ def numpy_blocks():
+ for block in block_gen:
+ yield np.array(block, first_type).transpose()
+ else:
+ if any(x == np.object_ for x in d_types):
+ self.np_types = [np.object_] * len(self.np_types)
+ self.np_types = np.dtype(list(zip(self.column_names, d_types)))
+
+ def numpy_blocks():
+ for block in block_gen:
+ np_array = np.empty(len(block[0]), dtype=self.np_types)
+ for col_name, data in zip(self.column_names, block):
+ np_array[col_name] = data
+ yield np_array
+
+ return numpy_blocks()
+
+ def _df_stream(self) -> Generator:
+ if self._block_gen is None:
+ raise StreamClosedError
+ block_gen = self._block_gen
+
+ def pd_blocks():
+ for block in block_gen:
+ yield pd.DataFrame(dict(zip(self.column_names, block)))
+
+ self._block_gen = None
+ return pd_blocks()
+
+ def close_numpy(self):
+ if not self._block_gen:
+ raise StreamClosedError
+ chunk_size = 4
+ pieces = []
+ blocks = []
+ for block in self._np_stream():
+ blocks.append(block)
+ if len(blocks) == chunk_size:
+ pieces.append(np.concatenate(blocks, dtype=self.np_types))
+ chunk_size *= 2
+ blocks = []
+ pieces.extend(blocks)
+ if len(pieces) > 1:
+ self._numpy_result = np.concatenate(pieces, dtype=self.np_types)
+ elif len(pieces) == 1:
+ self._numpy_result = pieces[0]
+ else:
+ self._numpy_result = np.empty((0,))
+ self.close()
+ return self
+
+ def close_df(self):
+ pieces = list(self._df_stream())
+ if len(pieces) > 1:
+ self._df_result = pd.concat(pieces, ignore_index=True)
+ elif len(pieces) == 1:
+ self._df_result = pieces[0]
+ else:
+ self._df_result = pd.DataFrame()
+ self.close()
+ return self
+
+ @property
+ def np_result(self):
+ if self._numpy_result is None:
+ self.close_numpy()
+ return self._numpy_result
+
+ @property
+ def df_result(self):
+ if self._df_result is None:
+ self.close_df()
+ return self._df_result
+
+ @property
+ def np_stream(self) -> StreamContext:
+ return StreamContext(self, self._np_stream())
+
+ @property
+ def df_stream(self) -> StreamContext:
+ return StreamContext(self, self._df_stream())
+
+ def close(self):
+ if self._block_gen is not None:
+ self._block_gen.close()
+ self._block_gen = None
+ if self.source:
+ self.source.close()
+ self.source = None
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/options.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/options.py
new file mode 100644
index 0000000000..4cec665c03
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/options.py
@@ -0,0 +1,52 @@
+from clickhouse_connect.driver.exceptions import NotSupportedError
+
+pd_time_test = None
+pd_extended_dtypes = False
+
+try:
+ import numpy as np
+except ImportError:
+ np = None
+
+try:
+ import pandas as pd
+ pd_extended_dtypes = not pd.__version__.startswith('0')
+ try:
+ from pandas.core.dtypes.common import is_datetime64_dtype
+ from pandas.core.dtypes.common import is_timedelta64_dtype
+
+ def combined_test(arr_or_dtype):
+ return is_datetime64_dtype(arr_or_dtype) or is_timedelta64_dtype(arr_or_dtype)
+
+ pd_time_test = combined_test
+ except ImportError:
+ try:
+ from pandas.core.dtypes.common import is_datetime_or_timedelta_dtype
+ pd_time_test = is_datetime_or_timedelta_dtype
+ except ImportError as ex:
+ raise NotSupportedError('pandas version does not contain expected test for temporal types') from ex
+except ImportError:
+ pd = None
+
+try:
+ import pyarrow as arrow
+except ImportError:
+ arrow = None
+
+
+def check_numpy():
+ if np:
+ return np
+ raise NotSupportedError('Numpy package is not installed')
+
+
+def check_pandas():
+ if pd:
+ return pd
+ raise NotSupportedError('Pandas package is not installed')
+
+
+def check_arrow():
+ if arrow:
+ return arrow
+ raise NotSupportedError('PyArrow package is not installed')
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py
new file mode 100644
index 0000000000..a158e7f999
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py
@@ -0,0 +1,166 @@
+from typing import Union, Tuple
+
+from clickhouse_connect.driver.common import unescape_identifier
+
+
+# pylint: disable=too-many-branches
+def parse_callable(expr) -> Tuple[str, Tuple[Union[str, int], ...], str]:
+ """
+ Parses a single level ClickHouse optionally 'callable' function/identifier. The identifier is returned as the
+ first value in the response tuple. If the expression is callable -- i.e. an identifier followed by 0 or more
+ arguments in parentheses, the second returned value is a tuple of the comma separated arguments. The third and
+ final tuple value is any text remaining after the initial expression for further parsing/processing.
+
+ Examples:
+ "Tuple(String, Enum('one' = 1, 'two' = 2))" will return "Tuple", ("String", "Enum('one' = 1,'two' = 2)"), ""
+ "MergeTree() PARTITION BY key" will return "MergeTree", (), "PARTITION BY key"
+
+ :param expr: ClickHouse DDL or Column Name expression
+ :return: Tuple of the identifier, a tuple of arguments, and remaining text
+ """
+ expr = expr.strip()
+ pos = expr.find('(')
+ space = expr.find(' ')
+ if pos == -1 and space == -1:
+ return expr, (), ''
+ if space != -1 and (pos == -1 or space < pos):
+ return expr[:space], (), expr[space:].strip()
+ name = expr[:pos]
+ pos += 1 # Skip first paren
+ values = []
+ value = ''
+ in_str = False
+ level = 0
+
+ def add_value():
+ try:
+ values.append(int(value))
+ except ValueError:
+ values.append(value)
+
+ while True:
+ char = expr[pos]
+ pos += 1
+ if in_str:
+ value += char
+ if char == "'":
+ in_str = False
+ elif char == '\\' and expr[pos] == "'" and expr[pos:pos + 4] != "' = " and expr[pos:pos + 2] != "')":
+ value += expr[pos]
+ pos += 1
+ else:
+ if level == 0:
+ if char == ' ':
+ space = pos
+ temp_char = expr[space]
+ while temp_char == ' ':
+ space += 1
+ temp_char = expr[space]
+ if not value or temp_char in "()',=><0":
+ char = temp_char
+ pos = space + 1
+ if char == ',':
+ add_value()
+ value = ''
+ continue
+ if char == ')':
+ break
+ if char == "'" and (not value or 'Enum' in value):
+ in_str = True
+ elif char == '(':
+ level += 1
+ elif char == ')' and level:
+ level -= 1
+ value += char
+ if value != '':
+ add_value()
+ return name, tuple(values), expr[pos:].strip()
+
+
+def parse_enum(expr) -> Tuple[Tuple[str], Tuple[int]]:
+ """
+ Parse a ClickHouse enum definition expression of the form ('key1' = 1, 'key2' = 2)
+ :param expr: ClickHouse enum expression/arguments
+ :return: Parallel tuples of string enum keys and integer enum values
+ """
+ keys = []
+ values = []
+ pos = expr.find('(') + 1
+ in_key = False
+ key = []
+ value = []
+ while True:
+ char = expr[pos]
+ pos += 1
+ if in_key:
+ if char == "'":
+ keys.append(''.join(key))
+ key = []
+ in_key = False
+ elif char == '\\' and expr[pos] == "'" and expr[pos:pos + 4] != "' = " and expr[pos:] != "')":
+ key.append(expr[pos])
+ pos += 1
+ else:
+ key.append(char)
+ elif char not in (' ', '='):
+ if char == ',':
+ values.append(int(''.join(value)))
+ value = []
+ elif char == ')':
+ values.append(int(''.join(value)))
+ break
+ elif char == "'" and not value:
+ in_key = True
+ else:
+ value.append(char)
+ values, keys = zip(*sorted(zip(values, keys)))
+ return tuple(keys), tuple(values)
+
+
+def parse_columns(expr: str):
+ """
+ Parse a ClickHouse column list of the form (col1 String, col2 Array(Tuple(String, Int32))). This also handles
+ unnamed columns (such as Tuple definitions). Mixed named and unnamed columns are not currently supported.
+ :param expr: ClickHouse enum expression/arguments
+ :return: Parallel tuples of column types and column types (strings)
+ """
+ names = []
+ columns = []
+ pos = 1
+ named = False
+ level = 0
+ label = ''
+ in_str = False
+ while True:
+ char = expr[pos]
+ pos += 1
+ if in_str:
+ if "'" == char:
+ in_str = False
+ elif char == '\\' and expr[pos] == "'" and expr[pos:pos + 4] != "' = " and expr[pos:pos + 2] != "')":
+ label += expr[pos]
+ pos += 1
+ else:
+ if level == 0:
+ if char == ' ':
+ if label and not named:
+ names.append(unescape_identifier(label))
+ label = ''
+ named = True
+ char = ''
+ elif char == ',':
+ columns.append(label)
+ named = False
+ label = ''
+ continue
+ elif char == ')':
+ columns.append(label)
+ break
+ if char == "'" and (not label or 'Enum' in label):
+ in_str = True
+ elif char == '(':
+ level += 1
+ elif char == ')':
+ level -= 1
+ label += char
+ return tuple(names), tuple(columns)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py
new file mode 100644
index 0000000000..0b5086ae11
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py
@@ -0,0 +1,496 @@
+import ipaddress
+import logging
+import re
+import uuid
+import pytz
+
+from enum import Enum
+from typing import Any, Tuple, Dict, Sequence, Optional, Union, Generator
+from datetime import date, datetime, tzinfo
+
+from pytz.exceptions import UnknownTimeZoneError
+
+from clickhouse_connect import common
+from clickhouse_connect.driver.common import dict_copy, empty_gen, StreamContext
+from clickhouse_connect.driver.external import ExternalData
+from clickhouse_connect.driver.types import Matrix, Closable
+from clickhouse_connect.json_impl import any_to_json
+from clickhouse_connect.driver.exceptions import StreamClosedError, ProgrammingError
+from clickhouse_connect.driver.options import check_arrow, pd_extended_dtypes
+from clickhouse_connect.driver.context import BaseQueryContext
+
+logger = logging.getLogger(__name__)
+commands = 'CREATE|ALTER|SYSTEM|GRANT|REVOKE|CHECK|DETACH|DROP|DELETE|KILL|' + \
+ 'OPTIMIZE|SET|RENAME|TRUNCATE|USE'
+
+limit_re = re.compile(r'\s+LIMIT($|\s)', re.IGNORECASE)
+select_re = re.compile(r'(^|\s)SELECT\s', re.IGNORECASE)
+insert_re = re.compile(r'(^|\s)INSERT\s*INTO', re.IGNORECASE)
+command_re = re.compile(r'(^\s*)(' + commands + r')\s', re.IGNORECASE)
+external_bind_re = re.compile(r'{.+:.+}')
+
+
+# pylint: disable=too-many-instance-attributes
+class QueryContext(BaseQueryContext):
+ """
+ Argument/parameter object for queries. This context is used to set thread/query specific formats
+ """
+
+ # pylint: disable=duplicate-code,too-many-arguments,too-many-locals
+ def __init__(self,
+ query: str = '',
+ parameters: Optional[Dict[str, Any]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ server_tz: tzinfo = pytz.UTC,
+ use_none: Optional[bool] = None,
+ column_oriented: Optional[bool] = None,
+ use_numpy: Optional[bool] = None,
+ max_str_len: Optional[int] = 0,
+ query_tz: Optional[Union[str, tzinfo]] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ use_extended_dtypes: Optional[bool] = None,
+ as_pandas: bool = False,
+ streaming: bool = False,
+ apply_server_tz: bool = False,
+ external_data: Optional[ExternalData] = None):
+ """
+ Initializes various configuration settings for the query context
+
+ :param query: Query string with Python style format value replacements
+ :param parameters: Optional dictionary of substitution values
+ :param settings: Optional ClickHouse settings for the query
+ :param query_formats: Optional dictionary of query formats with the key of a ClickHouse type name
+ (with * wildcards) and a value of valid query formats for those types.
+ The value 'encoding' can be sent to change the expected encoding for this query, with a value of
+ the desired encoding such as `latin-1`
+ :param column_formats: Optional dictionary of column specific formats. The key is the column name,
+ The value is either the format for the data column (such as 'string' for a UUID column) or a
+ second level "format" dictionary of a ClickHouse type name and a value of query formats. This
+ secondary dictionary can be used for nested column types such as Tuples or Maps
+ :param encoding: Optional string encoding for this query, such as 'latin-1'
+ :param column_formats: Optional dictionary
+ :param use_none: Use a Python None for ClickHouse NULL values in nullable columns. Otherwise the default
+ value of the column (such as 0 for numbers) will be returned in the result_set
+ :param max_str_len Limit returned ClickHouse String values to this length, which allows a Numpy
+ structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for
+ String columns will always be object arrays
+ :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects).
+ Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime
+ objects with the selected timezone
+ :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to
+ tzinfo objects). The timezone will be applied to datetime objects returned in the query
+ """
+ super().__init__(settings,
+ query_formats,
+ column_formats,
+ encoding,
+ use_extended_dtypes if use_extended_dtypes is not None else False,
+ use_numpy if use_numpy is not None else False)
+ self.query = query
+ self.parameters = parameters or {}
+ self.use_none = True if use_none is None else use_none
+ self.column_oriented = False if column_oriented is None else column_oriented
+ self.use_numpy = use_numpy
+ self.max_str_len = 0 if max_str_len is None else max_str_len
+ self.server_tz = server_tz
+ self.apply_server_tz = apply_server_tz
+ self.external_data = external_data
+ if isinstance(query_tz, str):
+ try:
+ query_tz = pytz.timezone(query_tz)
+ except UnknownTimeZoneError as ex:
+ raise ProgrammingError(f'query_tz {query_tz} is not recognized') from ex
+ self.query_tz = query_tz
+ if column_tzs is not None:
+ for col_name, timezone in column_tzs.items():
+ if isinstance(timezone, str):
+ try:
+ timezone = pytz.timezone(timezone)
+ column_tzs[col_name] = timezone
+ except UnknownTimeZoneError as ex:
+ raise ProgrammingError(f'column_tz {timezone} is not recognized') from ex
+ self.column_tzs = column_tzs
+ self.column_tz = None
+ self.response_tz = None
+ self.block_info = False
+ self.as_pandas = as_pandas
+ self.use_pandas_na = as_pandas and pd_extended_dtypes
+ self.streaming = streaming
+ self._update_query()
+
+ @property
+ def is_select(self) -> bool:
+ return select_re.search(self.uncommented_query) is not None
+
+ @property
+ def has_limit(self) -> bool:
+ return limit_re.search(self.uncommented_query) is not None
+
+ @property
+ def is_insert(self) -> bool:
+ return insert_re.search(self.uncommented_query) is not None
+
+ @property
+ def is_command(self) -> bool:
+ return command_re.search(self.uncommented_query) is not None
+
+ def set_parameters(self, parameters: Dict[str, Any]):
+ self.parameters = parameters
+ self._update_query()
+
+ def set_parameter(self, key: str, value: Any):
+ if not self.parameters:
+ self.parameters = {}
+ self.parameters[key] = value
+ self._update_query()
+
+ def set_response_tz(self, response_tz: tzinfo):
+ self.response_tz = response_tz
+
+ def start_column(self, name: str):
+ super().start_column(name)
+ if self.column_tzs and name in self.column_tzs:
+ self.column_tz = self.column_tzs[name]
+ else:
+ self.column_tz = None
+
+ def active_tz(self, datatype_tz: Optional[tzinfo]):
+ if self.column_tz:
+ active_tz = self.column_tz
+ elif datatype_tz:
+ active_tz = datatype_tz
+ elif self.query_tz:
+ active_tz = self.query_tz
+ elif self.response_tz:
+ active_tz = self.response_tz
+ elif self.apply_server_tz:
+ active_tz = self.server_tz
+ else:
+ active_tz = self.local_tz
+ # Special case where if everything is UTC, including the local timezone, we use naive timezones
+ # for performance reasons
+ if active_tz == pytz.UTC and active_tz.utcoffset(datetime.now()) == self.local_tz.utcoffset(datetime.now()):
+ return None
+ return active_tz
+
+ def updated_copy(self,
+ query: Optional[str] = None,
+ parameters: Optional[Dict[str, Any]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ server_tz: Optional[tzinfo] = None,
+ use_none: Optional[bool] = None,
+ column_oriented: Optional[bool] = None,
+ use_numpy: Optional[bool] = None,
+ max_str_len: Optional[int] = None,
+ query_tz: Optional[Union[str, tzinfo]] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ use_extended_dtypes: Optional[bool] = None,
+ as_pandas: bool = False,
+ streaming: bool = False,
+ external_data: Optional[ExternalData] = None) -> 'QueryContext':
+ """
+ Creates Query context copy with parameters overridden/updated as appropriate.
+ """
+ return QueryContext(query or self.query,
+ dict_copy(self.parameters, parameters),
+ dict_copy(self.settings, settings),
+ dict_copy(self.query_formats, query_formats),
+ dict_copy(self.column_formats, column_formats),
+ encoding if encoding else self.encoding,
+ server_tz if server_tz else self.server_tz,
+ self.use_none if use_none is None else use_none,
+ self.column_oriented if column_oriented is None else column_oriented,
+ self.use_numpy if use_numpy is None else use_numpy,
+ self.max_str_len if max_str_len is None else max_str_len,
+ self.query_tz if query_tz is None else query_tz,
+ self.column_tzs if column_tzs is None else column_tzs,
+ self.use_extended_dtypes if use_extended_dtypes is None else use_extended_dtypes,
+ as_pandas,
+ streaming,
+ self.apply_server_tz,
+ self.external_data if external_data is None else external_data)
+
+ def _update_query(self):
+ self.final_query, self.bind_params = bind_query(self.query, self.parameters, self.server_tz)
+ self.uncommented_query = remove_sql_comments(self.final_query)
+
+
+class QueryResult(Closable):
+ """
+ Wrapper class for query return values and metadata
+ """
+
+ # pylint: disable=too-many-arguments
+ def __init__(self,
+ result_set: Matrix = None,
+ block_gen: Generator[Matrix, None, None] = None,
+ column_names: Tuple = (),
+ column_types: Tuple = (),
+ column_oriented: bool = False,
+ source: Closable = None,
+ query_id: str = None,
+ summary: Dict[str, Any] = None):
+ self._result_rows = result_set
+ self._result_columns = None
+ self._block_gen = block_gen or empty_gen()
+ self._in_context = False
+ self._query_id = query_id
+ self.column_names = column_names
+ self.column_types = column_types
+ self.column_oriented = column_oriented
+ self.source = source
+ self.summary = {} if summary is None else summary
+
+ @property
+ def result_set(self) -> Matrix:
+ if self.column_oriented:
+ return self.result_columns
+ return self.result_rows
+
+ @property
+ def result_columns(self) -> Matrix:
+ if self._result_columns is None:
+ result = [[] for _ in range(len(self.column_names))]
+ with self.column_block_stream as stream:
+ for block in stream:
+ for base, added in zip(result, block):
+ base.extend(added)
+ self._result_columns = result
+ return self._result_columns
+
+ @property
+ def result_rows(self) -> Matrix:
+ if self._result_rows is None:
+ result = []
+ with self.row_block_stream as stream:
+ for block in stream:
+ result.extend(block)
+ self._result_rows = result
+ return self._result_rows
+
+ @property
+ def query_id(self) -> str:
+ query_id = self.summary.get('query_id')
+ if query_id:
+ return query_id
+ return self._query_id
+
+ def _column_block_stream(self):
+ if self._block_gen is None:
+ raise StreamClosedError
+ block_stream = self._block_gen
+ self._block_gen = None
+ return block_stream
+
+ def _row_block_stream(self):
+ for block in self._column_block_stream():
+ yield list(zip(*block))
+
+ @property
+ def column_block_stream(self) -> StreamContext:
+ return StreamContext(self, self._column_block_stream())
+
+ @property
+ def row_block_stream(self):
+ return StreamContext(self, self._row_block_stream())
+
+ @property
+ def rows_stream(self) -> StreamContext:
+ def stream():
+ for block in self._row_block_stream():
+ for row in block:
+ yield row
+
+ return StreamContext(self, stream())
+
+ def named_results(self) -> Generator[dict, None, None]:
+ for row in zip(*self.result_set) if self.column_oriented else self.result_set:
+ yield dict(zip(self.column_names, row))
+
+ @property
+ def row_count(self) -> int:
+ if self.column_oriented:
+ return 0 if len(self.result_set) == 0 else len(self.result_set[0])
+ return len(self.result_set)
+
+ @property
+ def first_item(self):
+ if self.column_oriented:
+ return {name: col[0] for name, col in zip(self.column_names, self.result_set)}
+ return dict(zip(self.column_names, self.result_set[0]))
+
+ @property
+ def first_row(self):
+ if self.column_oriented:
+ return [col[0] for col in self.result_set]
+ return self.result_set[0]
+
+ def close(self):
+ if self.source:
+ self.source.close()
+ self.source = None
+ if self._block_gen is not None:
+ self._block_gen.close()
+ self._block_gen = None
+
+
+BS = '\\'
+must_escape = (BS, '\'')
+
+
+def quote_identifier(identifier: str):
+ first_char = identifier[0]
+ if first_char in ('`', '"') and identifier[-1] == first_char:
+ # Identifier is already quoted, assume that it's valid
+ return identifier
+ return f'`{identifier}`'
+
+
+def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]],
+ server_tz: Optional[tzinfo] = None) -> str:
+ if not parameters:
+ return query
+ if hasattr(parameters, 'items'):
+ return query % {k: format_query_value(v, server_tz) for k, v in parameters.items()}
+ return query % tuple(format_query_value(v) for v in parameters)
+
+
+def bind_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]],
+ server_tz: Optional[tzinfo] = None) -> Tuple[str, Dict[str, str]]:
+ if not parameters:
+ return query, {}
+ if external_bind_re.search(query) is None:
+ return finalize_query(query, parameters, server_tz), {}
+ return query, {f'param_{k}': format_bind_value(v, server_tz) for k, v in parameters.items()}
+
+
+def format_str(value: str):
+ return f"'{escape_str(value)}'"
+
+
+def escape_str(value: str):
+ return ''.join(f'{BS}{c}' if c in must_escape else c for c in value)
+
+
+# pylint: disable=too-many-return-statements
+def format_query_value(value: Any, server_tz: tzinfo = pytz.UTC):
+ """
+ Format Python values in a ClickHouse query
+ :param value: Python object
+ :param server_tz: Server timezone for adjusting datetime values
+ :return: Literal string for python value
+ """
+ if value is None:
+ return 'NULL'
+ if isinstance(value, str):
+ return format_str(value)
+ if isinstance(value, datetime):
+ if value.tzinfo is None:
+ value = value.replace(tzinfo=server_tz)
+ return f"'{value.strftime('%Y-%m-%d %H:%M:%S')}'"
+ if isinstance(value, date):
+ return f"'{value.isoformat()}'"
+ if isinstance(value, list):
+ return f"[{', '.join(format_query_value(x, server_tz) for x in value)}]"
+ if isinstance(value, tuple):
+ return f"({', '.join(format_query_value(x, server_tz) for x in value)})"
+ if isinstance(value, dict):
+ if common.get_setting('dict_parameter_format') == 'json':
+ return format_str(any_to_json(value).decode())
+ pairs = [format_query_value(k, server_tz) + ':' + format_query_value(v, server_tz)
+ for k, v in value.items()]
+ return f"{{{', '.join(pairs)}}}"
+ if isinstance(value, Enum):
+ return format_query_value(value.value, server_tz)
+ if isinstance(value, (uuid.UUID, ipaddress.IPv4Address, ipaddress.IPv6Address)):
+ return f"'{value}'"
+ return str(value)
+
+
+# pylint: disable=too-many-branches
+def format_bind_value(value: Any, server_tz: tzinfo = pytz.UTC, top_level: bool = True):
+ """
+ Format Python values in a ClickHouse query
+ :param value: Python object
+ :param server_tz: Server timezone for adjusting datetime values
+ :param top_level: Flag for top level for nested structures
+ :return: Literal string for python value
+ """
+
+ def recurse(x):
+ return format_bind_value(x, server_tz, False)
+
+ if value is None:
+ return '\\N'
+ if isinstance(value, str):
+ if top_level:
+ # At the top levels, strings must not be surrounded by quotes
+ return escape_str(value)
+ return format_str(value)
+ if isinstance(value, datetime):
+ if value.tzinfo is None:
+ value = value.replace(tzinfo=server_tz)
+ val = value.strftime('%Y-%m-%d %H:%M:%S')
+ if top_level:
+ return val
+ return f"'{val}'"
+ if isinstance(value, date):
+ if top_level:
+ return value.isoformat()
+ return f"'{value.isoformat()}'"
+ if isinstance(value, list):
+ return f"[{', '.join(recurse(x) for x in value)}]"
+ if isinstance(value, tuple):
+ return f"({', '.join(recurse(x) for x in value)})"
+ if isinstance(value, dict):
+ if common.get_setting('dict_parameter_format') == 'json':
+ return any_to_json(value).decode()
+ pairs = [recurse(k) + ':' + recurse(v)
+ for k, v in value.items()]
+ return f"{{{', '.join(pairs)}}}"
+ if isinstance(value, Enum):
+ return recurse(value.value)
+ return str(value)
+
+
+comment_re = re.compile(r"(\".*?\"|\'.*?\')|(/\*.*?\*/|(--\s)[^\n]*$)", re.MULTILINE | re.DOTALL)
+
+
+def remove_sql_comments(sql: str) -> str:
+ """
+ Remove SQL comments. This is useful to determine the type of SQL query, such as SELECT or INSERT, but we
+ don't fully trust it to correctly ignore weird quoted strings, and other edge cases, so we always pass the
+ original SQL to ClickHouse (which uses a full-fledged AST/ token parser)
+ :param sql: SQL query
+ :return: SQL Query without SQL comments
+ """
+
+ def replacer(match):
+ # if the 2nd group (capturing comments) is not None, it means we have captured a
+ # non-quoted, actual comment string, so return nothing to remove the comment
+ if match.group(2):
+ return ''
+ # Otherwise we've actually captured a quoted string, so return it
+ return match.group(1)
+
+ return comment_re.sub(replacer, sql)
+
+
+def to_arrow(content: bytes):
+ pyarrow = check_arrow()
+ reader = pyarrow.ipc.RecordBatchFileReader(content)
+ return reader.read_all()
+
+
+def arrow_buffer(table) -> Tuple[Sequence[str], bytes]:
+ pyarrow = check_arrow()
+ sink = pyarrow.BufferOutputStream()
+ with pyarrow.RecordBatchFileWriter(sink, table.schema) as writer:
+ writer.write(table)
+ return table.schema.names, sink.getvalue()
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/summary.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/summary.py
new file mode 100644
index 0000000000..ef152cad76
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/summary.py
@@ -0,0 +1,39 @@
+from typing import Optional
+
+from clickhouse_connect.datatypes.registry import get_from_name
+
+from clickhouse_connect.driver.query import QueryResult
+
+
+class QuerySummary:
+ summary = {}
+
+ def __init__(self, summary: Optional[dict] = None):
+ if summary is not None:
+ self.summary = summary
+
+ @property
+ def written_rows(self) -> int:
+ return int(self.summary.get('written_rows', 0))
+
+ def written_bytes(self) -> int:
+ return int(self.summary.get('written_bytes', 0))
+
+ def query_id(self) -> str:
+ return self.summary.get('query_id', '')
+
+ def as_query_result(self) -> QueryResult:
+ data = []
+ column_names = []
+ column_types = []
+ str_type = get_from_name('String')
+ int_type = get_from_name('Int64')
+ for key, value in self.summary.items():
+ column_names.append(key)
+ if value.isnumeric():
+ data.append(int(value))
+ column_types.append(int_type)
+ else:
+ data.append(value)
+ column_types.append(str_type)
+ return QueryResult([data], column_names=tuple(column_names), column_types=tuple(column_types))
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py
new file mode 100644
index 0000000000..420686cd64
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py
@@ -0,0 +1,28 @@
+from typing import Optional, Sequence, Dict, Any
+
+from clickhouse_connect.driver import Client
+from clickhouse_connect.driver.summary import QuerySummary
+from clickhouse_connect.driver.query import quote_identifier
+
+
+def insert_file(client: Client,
+ table: str,
+ file_path: str,
+ fmt: Optional[str] = None,
+ column_names: Optional[Sequence[str]] = None,
+ database: Optional[str] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ compression: Optional[str] = None) -> QuerySummary:
+ full_table = f'{quote_identifier(database)}.{quote_identifier(table)}' if database else quote_identifier(table)
+ if not fmt:
+ fmt = 'CSV' if column_names else 'CSVWithNames'
+ if compression is None:
+ if file_path.endswith('.gzip') or file_path.endswith('.gz'):
+ compression = 'gzip'
+ with open(file_path, 'rb') as file:
+ return client.raw_insert(full_table,
+ column_names=column_names,
+ insert_block=file,
+ fmt=fmt,
+ settings=settings,
+ compression=compression)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py
new file mode 100644
index 0000000000..e781f63179
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py
@@ -0,0 +1,118 @@
+import logging
+from typing import Union
+
+from clickhouse_connect.datatypes import registry
+from clickhouse_connect.driver.common import write_leb128
+from clickhouse_connect.driver.exceptions import StreamCompleteException, StreamFailureError
+from clickhouse_connect.driver.insert import InsertContext
+from clickhouse_connect.driver.npquery import NumpyResult
+from clickhouse_connect.driver.query import QueryResult, QueryContext
+from clickhouse_connect.driver.types import ByteSource
+from clickhouse_connect.driver.compression import get_compressor
+
+_EMPTY_CTX = QueryContext()
+
+logger = logging.getLogger(__name__)
+
+
+class NativeTransform:
+ # pylint: disable=too-many-locals
+ @staticmethod
+ def parse_response(source: ByteSource, context: QueryContext = _EMPTY_CTX) -> Union[NumpyResult, QueryResult]:
+ names = []
+ col_types = []
+ block_num = 0
+
+ def get_block():
+ nonlocal block_num
+ result_block = []
+ try:
+ try:
+ if context.block_info:
+ source.read_bytes(8)
+ num_cols = source.read_leb128()
+ except StreamCompleteException:
+ return None
+ num_rows = source.read_leb128()
+ for col_num in range(num_cols):
+ name = source.read_leb128_str()
+ type_name = source.read_leb128_str()
+ if block_num == 0:
+ names.append(name)
+ col_type = registry.get_from_name(type_name)
+ col_types.append(col_type)
+ else:
+ col_type = col_types[col_num]
+ if num_rows == 0:
+ result_block.append(tuple())
+ else:
+ context.start_column(name)
+ column = col_type.read_column(source, num_rows, context)
+ result_block.append(column)
+ except Exception as ex:
+ source.close()
+ if isinstance(ex, StreamCompleteException):
+ # We ran out of data before it was expected, this could be ClickHouse reporting an error
+ # in the response
+ message = source.last_message
+ if len(message) > 1024:
+ message = message[-1024:]
+ error_start = message.find('Code: ')
+ if error_start != -1:
+ message = message[error_start:]
+ raise StreamFailureError(message) from None
+ raise
+ block_num += 1
+ return result_block
+
+ first_block = get_block()
+ if first_block is None:
+ return NumpyResult() if context.use_numpy else QueryResult([])
+
+ def gen():
+ yield first_block
+ while True:
+ next_block = get_block()
+ if next_block is None:
+ return
+ yield next_block
+
+ if context.use_numpy:
+ res_types = [col.dtype if hasattr(col, 'dtype') else 'O' for col in first_block]
+ return NumpyResult(gen(), tuple(names), tuple(col_types), res_types, source)
+ return QueryResult(None, gen(), tuple(names), tuple(col_types), context.column_oriented, source)
+
+ @staticmethod
+ def build_insert(context: InsertContext):
+ compressor = get_compressor(context.compression)
+
+ def chunk_gen():
+ for block in context.next_block():
+ output = bytearray()
+ output += block.prefix
+ write_leb128(block.column_count, output)
+ write_leb128(block.row_count, output)
+ for col_name, col_type, data in zip(block.column_names, block.column_types, block.column_data):
+ write_leb128(len(col_name), output)
+ output += col_name.encode()
+ write_leb128(len(col_type.name), output)
+ output += col_type.name.encode()
+ context.start_column(col_name)
+ try:
+ col_type.write_column(data, output, context)
+ except Exception as ex: # pylint: disable=broad-except
+ # This is hideous, but some low level serializations can fail while streaming
+ # the insert if the user has included bad data in the column. We need to ensure that the
+ # insert fails (using garbage data) to avoid a partial insert, and use the context to
+ # propagate the correct exception to the user
+ logger.error('Error serializing column `%s` into into data type `%s`',
+ col_name, col_type.name, exc_info=True)
+ context.insert_exception = ex
+ yield 'INTERNAL EXCEPTION WHILE SERIALIZING'.encode()
+ return
+ yield compressor.compress_block(output)
+ footer = compressor.flush()
+ if footer:
+ yield footer
+
+ return chunk_gen()
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/types.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/types.py
new file mode 100644
index 0000000000..015e162fbe
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/types.py
@@ -0,0 +1,50 @@
+from abc import ABC, abstractmethod
+from typing import Sequence, Any
+
+Matrix = Sequence[Sequence[Any]]
+
+
+class Closable(ABC):
+ @abstractmethod
+ def close(self):
+ pass
+
+
+class ByteSource(Closable):
+ last_message = None
+
+ @abstractmethod
+ def read_leb128(self) -> int:
+ pass
+
+ @abstractmethod
+ def read_leb128_str(self) -> str:
+ pass
+
+ @abstractmethod
+ def read_uint64(self) -> int:
+ pass
+
+ @abstractmethod
+ def read_bytes(self, sz: int) -> bytes:
+ pass
+
+ @abstractmethod
+ def read_str_col(self, num_rows: int, encoding: str, nullable: bool = False, null_obj: Any = None):
+ pass
+
+ @abstractmethod
+ def read_bytes_col(self, sz: int, num_rows: int):
+ pass
+
+ @abstractmethod
+ def read_fixed_str_col(self, sz: int, num_rows: int, encoding: str):
+ pass
+
+ @abstractmethod
+ def read_array(self, array_type: str, num_rows: int):
+ pass
+
+ @abstractmethod
+ def read_byte(self) -> int:
+ pass