diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-10-12 10:39:22 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-10-12 10:50:34 +0300 |
commit | b904316f22d889171e7fe39e8c467cbea5c6cdd6 (patch) | |
tree | 84c870d76f7278ad6a8d1c020a8b950edb7d531e /contrib/python/clickhouse-connect/clickhouse_connect | |
parent | e493167a2cecbdc68258d70ddb044e7a0f56aa7f (diff) | |
download | ydb-b904316f22d889171e7fe39e8c467cbea5c6cdd6.tar.gz |
Intermediate changes
commit_hash:3ed72e620c7eace6c8edd510ac2324e8acfcfafb
Diffstat (limited to 'contrib/python/clickhouse-connect/clickhouse_connect')
35 files changed, 658 insertions, 345 deletions
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py index d9d76edb88..c5e4522a93 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py @@ -1 +1 @@ -version = '0.7.19' +version = '0.8.0' diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/base.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/base.py index 0c0d25e6b0..ce73da1481 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/base.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/base.py @@ -5,7 +5,7 @@ from sqlalchemy.exc import CompileError from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef, EMPTY_TYPE_DEF from clickhouse_connect.datatypes.registry import parse_name, type_map -from clickhouse_connect.driver.query import str_query_value +from clickhouse_connect.driver.binding import str_query_value logger = logging.getLogger(__name__) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/custom.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/custom.py index b7eee4ad7d..9f7c1e6401 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/custom.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/custom.py @@ -1,7 +1,7 @@ from sqlalchemy.sql.ddl import DDL from sqlalchemy.exc import ArgumentError -from clickhouse_connect.driver.query import quote_identifier +from clickhouse_connect.driver.binding import quote_identifier # pylint: disable=too-many-ancestors,abstract-method @@ -31,7 +31,7 @@ class CreateDatabase(DDL): super().__init__(stmt) -# pylint: disable=too-many-ancestors,abstract-method +# pylint: disable=too-many-ancestors,abstract-method class DropDatabase(DDL): """ Alternative DDL statement for built in SqlAlchemy DropSchema DDL class diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py index 6b04c7e2fe..c9bb24c95a 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py @@ -8,7 +8,7 @@ from clickhouse_connect.cc_sqlalchemy.sql import full_table from clickhouse_connect.cc_sqlalchemy.sql.ddlcompiler import ChDDLCompiler from clickhouse_connect.cc_sqlalchemy import ischema_names, dialect_name from clickhouse_connect.cc_sqlalchemy.sql.preparer import ChIdentifierPreparer -from clickhouse_connect.driver.query import quote_identifier, format_str +from clickhouse_connect.driver.binding import quote_identifier, format_str # pylint: disable=too-many-public-methods,no-self-use,unused-argument diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/__init__.py index 68becd54d6..00b9bc8c13 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/__init__.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/__init__.py @@ -2,7 +2,7 @@ from typing import Optional from sqlalchemy import Table -from clickhouse_connect.driver.query import quote_identifier +from clickhouse_connect.driver.binding import quote_identifier def full_table(table_name: str, schema: Optional[str] = None) -> str: diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py index 5a97254705..8a2180c466 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py @@ -2,7 +2,7 @@ from sqlalchemy import Column from sqlalchemy.sql.compiler import DDLCompiler from clickhouse_connect.cc_sqlalchemy.sql import format_table -from clickhouse_connect.driver.query import quote_identifier +from clickhouse_connect.driver.binding import quote_identifier class ChDDLCompiler(DDLCompiler): diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/preparer.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/preparer.py index a31b3e7af6..f53a2bde37 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/preparer.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/preparer.py @@ -1,6 +1,6 @@ from sqlalchemy.sql.compiler import IdentifierPreparer -from clickhouse_connect.driver.query import quote_identifier +from clickhouse_connect.driver.binding import quote_identifier class ChIdentifierPreparer(IdentifierPreparer): diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/common.py b/contrib/python/clickhouse-connect/clickhouse_connect/common.py index 1fda1ecfd7..ab960e76b7 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/common.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/common.py @@ -81,3 +81,6 @@ _init_common('send_os_user', (True, False), True) _init_common('use_protocol_version', (True, False), True) _init_common('max_error_size', (), 1024) + +# HTTP raw data buffer for streaming queries. This should not be reduced below 64KB to ensure compatibility with LZ4 compression +_init_common('http_buffer_size', (), 10 * 1024 * 1024) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/__init__.py index aa9b8c2f5d..e4229f66c3 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/__init__.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/__init__.py @@ -4,4 +4,6 @@ import clickhouse_connect.datatypes.numeric import clickhouse_connect.datatypes.special import clickhouse_connect.datatypes.string import clickhouse_connect.datatypes.temporal +import clickhouse_connect.datatypes.dynamic import clickhouse_connect.datatypes.registry +import clickhouse_connect.datatypes.postinit diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py index b1990280eb..ab43b4e1db 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py @@ -3,7 +3,7 @@ import logging from abc import ABC from math import log -from typing import NamedTuple, Dict, Type, Any, Sequence, MutableSequence, Optional, Union, Collection +from typing import NamedTuple, Dict, Type, Any, Sequence, MutableSequence, Union, Collection from clickhouse_connect.driver.common import array_type, int_size, write_array, write_uint64, low_card_version from clickhouse_connect.driver.context import BaseQueryContext @@ -94,6 +94,10 @@ class ClickHouseType(ABC): name = f'{wrapper}({name})' return name + @property + def insert_name(self): + return self.name + def data_size(self, sample: Sequence) -> int: if self.low_card: values = set(sample) @@ -104,10 +108,13 @@ class ClickHouseType(ABC): d_size += 1 return d_size - def _data_size(self, _sample: Collection) -> int: + def _data_size(self, sample: Collection) -> int: if self.byte_size: return self.byte_size - return 0 + total = 0 + for x in sample: + total += len(str(x)) + return total / len(sample) + 1 def write_column_prefix(self, dest: bytearray): """ @@ -119,7 +126,7 @@ class ClickHouseType(ABC): if self.low_card: write_uint64(low_card_version, dest) - def read_column_prefix(self, source: ByteSource): + def read_column_prefix(self, source: ByteSource, _ctx: QueryContext): """ Read the low cardinality version. Like the write method, this has to happen immediately for container classes :param source: The native protocol binary read buffer @@ -139,7 +146,7 @@ class ClickHouseType(ABC): :param ctx: QueryContext for query specific settings :return: The decoded column data as a sequence and the updated location pointer """ - self.read_column_prefix(source) + self.read_column_prefix(source, ctx) return self.read_column_data(source, num_rows, ctx) def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: @@ -274,18 +281,11 @@ class ClickHouseType(ABC): write_uint64(len(index), dest) self._write_column_binary(index, dest, ctx) write_uint64(len(keys), dest) - write_array(array_type(1 << ix_type, False), keys, dest) + write_array(array_type(1 << ix_type, False), keys, dest, ctx.column_name) def _active_null(self, _ctx: QueryContext) -> Any: return None - def _first_value(self, column: Sequence) -> Optional[Any]: - if self.nullable: - return next((x for x in column if x is not None), None) - if len(column): - return column[0] - return None - EMPTY_TYPE_DEF = TypeDef() NULLABLE_TYPE_DEF = TypeDef(wrappers=('Nullable',)) @@ -338,7 +338,7 @@ class ArrayType(ClickHouseType, ABC, registered=False): def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): if len(column) and self.nullable: column = [0 if x is None else x for x in column] - write_array(self._array_type, column, dest) + write_array(self._array_type, column, dest, ctx.column_name) def _active_null(self, ctx: QueryContext): if ctx.as_pandas and ctx.use_extended_dtypes: diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py index 445b24140d..b8244eefdb 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py @@ -3,11 +3,12 @@ import logging from typing import Sequence, Collection from clickhouse_connect.driver.insert import InsertContext -from clickhouse_connect.driver.query import QueryContext, quote_identifier +from clickhouse_connect.driver.query import QueryContext +from clickhouse_connect.driver.binding import quote_identifier from clickhouse_connect.driver.types import ByteSource from clickhouse_connect.json_impl import any_to_json from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef -from clickhouse_connect.driver.common import must_swap +from clickhouse_connect.driver.common import must_swap, first_value from clickhouse_connect.datatypes.registry import get_from_name logger = logging.getLogger(__name__) @@ -22,8 +23,8 @@ class Array(ClickHouseType): self.element_type = get_from_name(type_def.values[0]) self._name_suffix = f'({self.element_type.name})' - def read_column_prefix(self, source: ByteSource): - return self.element_type.read_column_prefix(source) + def read_column_prefix(self, source: ByteSource, ctx:QueryContext): + return self.element_type.read_column_prefix(source, ctx) def _data_size(self, sample: Sequence) -> int: if len(sample) == 0: @@ -102,7 +103,7 @@ class Tuple(ClickHouseType): if len(sample) == 0: return 0 elem_size = 0 - is_dict = self.element_names and isinstance(self._first_value(list(sample)), dict) + is_dict = self.element_names and isinstance(first_value(list(sample), self.nullable), dict) for ix, e_type in enumerate(self.element_types): if e_type.byte_size > 0: elem_size += e_type.byte_size @@ -112,9 +113,9 @@ class Tuple(ClickHouseType): elem_size += e_type.data_size([x[ix] for x in sample]) return elem_size - def read_column_prefix(self, source: ByteSource): + def read_column_prefix(self, source: ByteSource, ctx: QueryContext): for e_type in self.element_types: - e_type.read_column_prefix(source) + e_type.read_column_prefix(source, ctx) def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext): columns = [] @@ -138,7 +139,7 @@ class Tuple(ClickHouseType): e_type.write_column_prefix(dest) def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext): - if self.element_names and isinstance(self._first_value(column), dict): + if self.element_names and isinstance(first_value(column, self.nullable), dict): columns = self.convert_dict_insert(column) else: columns = list(zip(*column)) @@ -180,9 +181,9 @@ class Map(ClickHouseType): total += self.value_type.data_size(x.values()) return total // len(sample) - def read_column_prefix(self, source: ByteSource): - self.key_type.read_column_prefix(source) - self.value_type.read_column_prefix(source) + def read_column_prefix(self, source: ByteSource, ctx: QueryContext): + self.key_type.read_column_prefix(source, ctx) + self.value_type.read_column_prefix(source, ctx) # pylint: disable=too-many-locals def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext): @@ -237,8 +238,8 @@ class Nested(ClickHouseType): array_sample = [[tuple(sub_row[key] for key in keys) for sub_row in row] for row in sample] return self.tuple_array.data_size(array_sample) - def read_column_prefix(self, source: ByteSource): - self.tuple_array.read_column_prefix(source) + def read_column_prefix(self, source: ByteSource, ctx:QueryContext): + self.tuple_array.read_column_prefix(source, ctx) def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext): keys = self.element_names @@ -252,64 +253,3 @@ class Nested(ClickHouseType): keys = self.element_names data = [[tuple(sub_row[key] for key in keys) for sub_row in row] for row in column] self.tuple_array.write_column_data(data, dest, ctx) - - -class JSON(ClickHouseType): - python_type = dict - # Native is a Python type (primitive, dict, array), string is an actual JSON string - valid_formats = 'string', 'native' - - def write_column_prefix(self, dest: bytearray): - dest.append(0x01) - - def _data_size(self, sample: Collection) -> int: - if len(sample) == 0: - return 0 - total = 0 - for x in sample: - if isinstance(x, str): - total += len(x) - elif x: - total += len(any_to_json(x)) - return total // len(sample) + 1 - - # pylint: disable=duplicate-code - def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext): - app = dest.append - first = self._first_value(column) - if isinstance(first, str) or self.write_format(ctx) == 'string': - for x in column: - v = x.encode() - sz = len(v) - while True: - b = sz & 0x7f - sz >>= 7 - if sz == 0: - app(b) - break - app(0x80 | b) - dest += v - else: - to_json = any_to_json - for x in column: - v = to_json(x) - sz = len(v) - while True: - b = sz & 0x7f - sz >>= 7 - if sz == 0: - app(b) - break - app(0x80 | b) - dest += v - - -class Object(JSON): - python_type = dict - - def __init__(self, type_def): - data_type = type_def.values[0].lower().replace(' ', '') - if data_type not in ("'json'", "nullable('json')"): - raise NotImplementedError('Only json or Nullable(json) Object type is currently supported') - super().__init__(type_def) - self._name_suffix = type_def.arg_str diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/dynamic.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/dynamic.py new file mode 100644 index 0000000000..e32145f1f8 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/dynamic.py @@ -0,0 +1,257 @@ +from typing import List, Sequence, Collection + +from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef +from clickhouse_connect.datatypes.registry import get_from_name +from clickhouse_connect.driver.common import unescape_identifier, first_value +from clickhouse_connect.driver.ctypes import data_conv +from clickhouse_connect.driver.errors import handle_error +from clickhouse_connect.driver.exceptions import DataError +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.query import QueryContext +from clickhouse_connect.driver.types import ByteSource +from clickhouse_connect.json_impl import any_to_json + +SHARED_DATA_TYPE: ClickHouseType +STRING_DATA_TYPE: ClickHouseType + +class Variant(ClickHouseType): + _slots = 'element_types' + python_type = object + + def __init__(self, type_def: TypeDef): + super().__init__(type_def) + self.element_types:List[ClickHouseType] = [get_from_name(name) for name in type_def.values] + self._name_suffix = f"({', '.join(ch_type.name for ch_type in self.element_types)})" + + @property + def insert_name(self): + return 'String' + + def read_column_prefix(self, source: ByteSource, ctx:QueryContext): + if source.read_uint64() != 0: + raise DataError(f'Unexpected discriminator format in Variant column {ctx.column_name}') + + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: + return read_variant_column(self.element_types, source, num_rows, ctx) + + def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext): + write_str_values(self, column, dest, ctx) + + +def read_variant_column(variant_types: List[ClickHouseType], source: ByteSource, num_rows:int, ctx: QueryContext) -> Sequence: + v_count = len(variant_types) + discriminators = source.read_array('B', num_rows) + # We have to count up how many of each discriminator there are in the block to read the sub columns correctly + disc_rows = [0] * v_count + for disc in discriminators: + if disc != 255: + disc_rows[disc] += 1 + sub_columns: List[Sequence] = [[]] * v_count + # Read all the sub-columns + for ix in range(v_count): + if disc_rows[ix] > 0: + sub_columns[ix] = variant_types[ix].read_column_data(source, disc_rows[ix], ctx) + # Now we have to walk through each of the discriminators again to assign the correct value from + # the sub-column to the final result column + sub_indexes = [0] * v_count + col = [] + app_col = col.append + for disc in discriminators: + if disc == 255: + app_col(None) + else: + app_col(sub_columns[disc][sub_indexes[disc]]) + sub_indexes[disc] += 1 + return col + + +class Dynamic(ClickHouseType): + python_type = object + + @property + def insert_name(self): + return 'String' + + def __init__(self, type_def:TypeDef): + super().__init__(type_def) + if type_def.keys and type_def.keys[0] == 'max_types': + self._name_suffix = f'(max_types={type_def.values[0]})' + + def read_column(self, source: ByteSource, num_rows:int, ctx:QueryContext): + variant_types = read_dynamic_prefix(source) + return read_variant_column(variant_types, source, num_rows, ctx) + + def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext): + write_str_values(self, column, dest, ctx) + + +def read_dynamic_prefix(source: ByteSource) -> List[ClickHouseType]: + if source.read_uint64() != 1: # dynamic structure serialization version, currently only 1 is recognized + raise DataError('Unrecognized dynamic structure version') + source.read_leb128() # max dynamic types, we ignore this value + num_variants = source.read_leb128() + variant_types = [get_from_name(source.read_leb128_str()) for _ in range(num_variants)] + variant_types.append(STRING_DATA_TYPE) + if source.read_uint64() != 0: # discriminator format, currently only 0 is recognized + raise DataError('Unexpected discriminator format in Variant column prefix') + return variant_types + + +def json_sample_size(_, sample: Collection) -> int: + if len(sample) == 0: + return 0 + total = 0 + for x in sample: + if isinstance(x, str): + total += len(x) + elif x: + total += len(any_to_json(x)) + return total // len(sample) + 1 + + +def write_json(ch_type:ClickHouseType, column: Sequence, dest: bytearray, ctx: InsertContext): + first = first_value(column, ch_type.nullable) + write_col = column + encoding = ctx.encoding or ch_type.encoding + if not isinstance(first, str) and ch_type.write_format(ctx) != 'string': + to_json = any_to_json + write_col = [to_json(v) for v in column] + encoding = None + handle_error(data_conv.write_str_col(write_col, ch_type.nullable, encoding, dest), ctx) + + +def write_str_values(ch_type:ClickHouseType, column: Sequence, dest: bytearray, ctx: InsertContext): + encoding = ctx.encoding or ch_type.encoding + col = [''] * len(column) + for ix, v in enumerate(column): + if v is None: + col[ix] = 'NULL' + else: + col[ix] = str(v) + handle_error(data_conv.write_str_col(col, False, encoding, dest), ctx) + + +class JSON(ClickHouseType): + _slots = 'typed_paths', 'typed_types' + python_type = dict + valid_formats = 'string', 'native' + _data_size = json_sample_size + write_column_data = write_json + shared_data_type: ClickHouseType + max_dynamic_paths = 0 + max_dynamic_types = 0 + typed_paths = [] + typed_types = [] + skips = [] + + def __init__(self, type_def:TypeDef): + super().__init__(type_def) + typed_paths = [] + typed_types = [] + skips = [] + parts = [] + for key, value in zip(type_def.keys, type_def.values): + if key == 'max_dynamic_paths': + try: + self.max_dynamic_paths = int(value) + parts.append(f'{key} = {value}') + continue + except ValueError: + pass + if key == 'max_dynamic_types': + try: + self.max_dynamic_types = int(value) + parts.append(f'{key} = {value}') + continue + except ValueError: + pass + if key == 'SKIP': + if value.startswith('REGEXP'): + value = 'REGEXP ' + value[6:] + else: + if not value.startswith("`"): + value = f'`{value}`' + skips.append(value) + else: + key = unescape_identifier(key) + typed_paths.append(key) + typed_types.append(get_from_name(value)) + key = f'`{key}`' + parts.append(f'{key} {value}') + if typed_paths: + self.typed_paths = typed_paths + self.typed_types = typed_types + if skips: + self.skips = skips + if parts: + self._name_suffix = f'({", ".join(parts)})' + + @property + def insert_name(self): + return 'String' + + # pylint: disable=too-many-locals + def read_column(self, source: ByteSource, num_rows: int, ctx: QueryContext): + if source.read_uint64() != 0: # object serialization version, currently only 0 is recognized + raise DataError(f'unrecognized object serialization version, column `{ctx.column_name}`') + source.read_leb128() # the max number of dynamic paths. Used to preallocate storage in ClickHouse; we ignore it + dynamic_path_cnt = source.read_leb128() + dynamic_paths = [source.read_leb128_str() for _ in range(dynamic_path_cnt)] + for typed in self.typed_types: + typed.read_column_prefix(source, ctx) + dynamic_variants = [read_dynamic_prefix(source) for _ in range(dynamic_path_cnt)] + # C++ prefix read ends here + + typed_columns = [ch_type.read_column_data(source, num_rows, ctx) for ch_type in self.typed_types] + dynamic_columns = [read_variant_column(dynamic_variants[ix], source, num_rows, ctx) for ix in range(dynamic_path_cnt)] + SHARED_DATA_TYPE.read_column_data(source, num_rows, ctx) + col = [] + for row_num in range(num_rows): + top = {} + for ix, field in enumerate(self.typed_paths): + value = typed_columns[ix][row_num] + item = top + chain = field.split('.') + for key in chain[:-1]: + child = item.get(key) + if child is None: + child = {} + item[key] = child + item = child + item[chain[-1]] = value + for ix, field in enumerate(dynamic_paths): + value = dynamic_columns[ix][row_num] + if value is None: + continue + item = top + chain = field.split('.') + for key in chain[:-1]: + child = item.get(key) + if child is None: + child = {} + item[key] = child + item = child + item[chain[-1]] = value + col.append(top) + if self.read_format(ctx) == 'string': + return [any_to_json(v) for v in col] + return col + + +# Note that this type is deprecated and should not be used, it included for temporary backward compatibility only +class Object(ClickHouseType): + python_type = dict + # Native is a Python type (primitive, dict, array), string is an actual JSON string + valid_formats = 'string', 'native' + _data_size = json_sample_size + write_column_data = write_json + + def __init__(self, type_def): + data_type = type_def.values[0].lower().replace(' ', '') + if data_type not in ("'json'", "nullable('json')"): + raise NotImplementedError('Only json or Nullable(json) Object type is currently supported') + super().__init__(type_def) + self._name_suffix = type_def.arg_str + + def write_column_prefix(self, dest: bytearray): + dest.append(0x01) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py index 14b7bc3b9a..f44686367d 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py @@ -3,7 +3,7 @@ from ipaddress import IPv4Address, IPv6Address from typing import Union, MutableSequence, Sequence from clickhouse_connect.datatypes.base import ClickHouseType -from clickhouse_connect.driver.common import write_array, int_size +from clickhouse_connect.driver.common import write_array, int_size, first_value from clickhouse_connect.driver.insert import InsertContext from clickhouse_connect.driver.query import QueryContext from clickhouse_connect.driver.types import ByteSource @@ -29,7 +29,7 @@ class IPv4(ClickHouseType): return data_conv.read_ipv4_col(source, num_rows) def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): - first = self._first_value(column) + first = first_value(column, self.nullable) if isinstance(first, str): fixed = 24, 16, 8, 0 # pylint: disable=consider-using-generator @@ -39,7 +39,7 @@ class IPv4(ClickHouseType): column = [x._ip if x else 0 for x in column] else: column = [x._ip for x in column] - write_array(self._array_type, column, dest) + write_array(self._array_type, column, dest, ctx.column_name) def _active_null(self, ctx: QueryContext): fmt = self.read_format(ctx) @@ -103,7 +103,7 @@ class IPv6(ClickHouseType): def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): v = V6_NULL - first = self._first_value(column) + first = first_value(column, self.nullable) v4mask = IPV4_V6_MASK af6 = socket.AF_INET6 tov6 = socket.inet_pton diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py index 2cd9e5f105..0ff62b1dc2 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py @@ -4,7 +4,7 @@ from typing import Union, Type, Sequence, MutableSequence from math import nan from clickhouse_connect.datatypes.base import TypeDef, ArrayType, ClickHouseType -from clickhouse_connect.driver.common import array_type, write_array, decimal_size, decimal_prec +from clickhouse_connect.driver.common import array_type, write_array, decimal_size, decimal_prec, first_value from clickhouse_connect.driver.ctypes import numpy_conv, data_conv from clickhouse_connect.driver.insert import InsertContext from clickhouse_connect.driver.options import pd, np @@ -98,7 +98,7 @@ class BigInt(ClickHouseType, registered=False): def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): if len(column) == 0: return - first = self._first_value(column) + first = first_value(column, self.nullable) sz = self.byte_size signed = self._signed empty = bytes(b'\x00' * sz) @@ -189,8 +189,8 @@ class Bool(ClickHouseType): return np.array(column) return column - def _write_column_binary(self, column, dest, _ctx): - write_array('B', [1 if x else 0 for x in column], dest) + def _write_column_binary(self, column, dest, ctx): + write_array('B', [1 if x else 0 for x in column], dest, ctx.column_name) class Boolean(Bool): @@ -218,15 +218,15 @@ class Enum(ClickHouseType): lookup = self._int_map.get return [lookup(x, None) for x in column] - def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, _ctx): - first = self._first_value(column) + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx:InsertContext): + first = first_value(column, self.nullable) if first is None or not isinstance(first, str): if self.nullable: column = [0 if not x else x for x in column] - write_array(self._array_type, column, dest) + write_array(self._array_type, column, dest, ctx.column_name) else: lookup = self._name_map.get - write_array(self._array_type, [lookup(x, 0) for x in column], dest) + write_array(self._array_type, [lookup(x, 0) for x in column], dest, ctx.column_name) class Enum8(Enum): @@ -285,15 +285,15 @@ class Decimal(ClickHouseType): app(dec(f'-{digits[:-scale]}.{digits[-scale:]}')) return new_col - def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, _ctx): - with decimal.localcontext() as ctx: - ctx.prec = self.prec + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx:InsertContext): + with decimal.localcontext() as dec_ctx: + dec_ctx.prec = self.prec dec = decimal.Decimal mult = self._mult if self.nullable: - write_array(self._array_type, [int(dec(str(x)) * mult) if x else 0 for x in column], dest) + write_array(self._array_type, [int(dec(str(x)) * mult) if x else 0 for x in column], dest, ctx.column_name) else: - write_array(self._array_type, [int(dec(str(x)) * mult) for x in column], dest) + write_array(self._array_type, [int(dec(str(x)) * mult) for x in column], dest, ctx.column_name) def _active_null(self, ctx: QueryContext): if ctx.use_none: diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/postinit.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/postinit.py new file mode 100644 index 0000000000..3536c65195 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/postinit.py @@ -0,0 +1,4 @@ +from clickhouse_connect.datatypes import registry, dynamic + +dynamic.SHARED_DATA_TYPE = registry.get_from_name('Array(String, String)') +dynamic.STRING_DATA_TYPE = registry.get_from_name('String') diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/registry.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/registry.py index 52d1036787..6fb20d8af6 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/registry.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/registry.py @@ -35,6 +35,12 @@ def parse_name(name: str) -> Tuple[str, str, TypeDef]: elif base.startswith('Tuple'): keys, values = parse_columns(base[5:]) base = 'Tuple' + elif base.startswith('Variant'): + keys, values = parse_columns(base[7:]) + base = 'Variant' + elif base.startswith('JSON') and len(base) > 4 and base[4] == '(': + keys, values = parse_columns(base[4:]) + base = 'JSON' elif base == 'Point': values = ('Float64', 'Float64') else: diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py index 29d28e3378..9d296d00fc 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py @@ -3,6 +3,7 @@ from uuid import UUID as PYUUID from clickhouse_connect.datatypes.base import TypeDef, ClickHouseType, ArrayType, UnsupportedType from clickhouse_connect.datatypes.registry import get_from_name +from clickhouse_connect.driver.common import first_value from clickhouse_connect.driver.ctypes import data_conv from clickhouse_connect.driver.insert import InsertContext from clickhouse_connect.driver.query import QueryContext @@ -37,7 +38,7 @@ class UUID(ClickHouseType): # pylint: disable=too-many-branches def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): - first = self._first_value(column) + first = first_value(column, self.nullable) empty = empty_uuid_b if isinstance(first, str) or self.write_format(ctx) == 'string': for v in column: @@ -92,8 +93,8 @@ class SimpleAggregateFunction(ClickHouseType): def _data_size(self, sample: Sequence) -> int: return self.element_type.data_size(sample) - def read_column_prefix(self, source: ByteSource): - return self.element_type.read_column_prefix(source) + def read_column_prefix(self, source: ByteSource, ctx: QueryContext): + return self.element_type.read_column_prefix(source, ctx) def write_column_prefix(self, dest: bytearray): self.element_type.write_column_prefix(dest) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py index 3cfcd38630..4b7886510d 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py @@ -1,10 +1,10 @@ from typing import Sequence, MutableSequence, Union, Collection +from clickhouse_connect.driver.common import first_value from clickhouse_connect.driver.ctypes import data_conv from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef from clickhouse_connect.driver.errors import handle_error -from clickhouse_connect.driver.exceptions import DataError from clickhouse_connect.driver.insert import InsertContext from clickhouse_connect.driver.query import QueryContext from clickhouse_connect.driver.types import ByteSource @@ -45,9 +45,9 @@ class String(ClickHouseType): def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): encoding = None - if not isinstance(self._first_value(column), bytes): + if not isinstance(first_value(column, self.nullable), bytes): encoding = ctx.encoding or self.encoding - handle_error(data_conv.write_str_col(column, self.nullable, encoding, dest)) + handle_error(data_conv.write_str_col(column, self.nullable, encoding, dest), ctx) def _active_null(self, ctx): if ctx.use_none: @@ -92,7 +92,7 @@ class FixedString(ClickHouseType): empty = bytes((0,) * sz) str_enc = str.encode enc = ctx.encoding or self.encoding - first = self._first_value(column) + first = first_value(column, self.nullable) if isinstance(first, str) or self.write_format(ctx) == 'string': if self.nullable: for x in column: @@ -104,7 +104,7 @@ class FixedString(ClickHouseType): except UnicodeEncodeError: b = empty if len(b) > sz: - raise DataError(f'UTF-8 encoded FixedString value {b.hex(" ")} exceeds column size {sz}') + raise ctx.data_error(f'UTF-8 encoded FixedString value {b.hex(" ")} exceeds column size {sz}') ext(b) ext(empty[:sz - len(b)]) else: @@ -114,7 +114,7 @@ class FixedString(ClickHouseType): except UnicodeEncodeError: b = empty if len(b) > sz: - raise DataError(f'UTF-8 encoded FixedString value {b.hex(" ")} exceeds column size {sz}') + raise ctx.data_error(f'UTF-8 encoded FixedString value {b.hex(" ")} exceeds column size {sz}') ext(b) ext(empty[:sz - len(b)]) elif self.nullable: @@ -122,11 +122,11 @@ class FixedString(ClickHouseType): if not b: ext(empty) elif len(b) != sz: - raise DataError(f'Fixed String binary value {b.hex(" ")} does not match column size {sz}') + raise ctx.data_error(f'Fixed String binary value {b.hex(" ")} does not match column size {sz}') else: ext(b) else: for b in column: if len(b) != sz: - raise DataError(f'Fixed String binary value {b.hex(" ")} does not match column size {sz}') + raise ctx.data_error(f'Fixed String binary value {b.hex(" ")} does not match column size {sz}') ext(b) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py index da672823b6..6359d5ba55 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py @@ -4,7 +4,7 @@ from datetime import date, datetime, tzinfo from typing import Union, Sequence, MutableSequence from clickhouse_connect.datatypes.base import TypeDef, ClickHouseType -from clickhouse_connect.driver.common import write_array, np_date_types, int_size +from clickhouse_connect.driver.common import write_array, np_date_types, int_size, first_value from clickhouse_connect.driver.exceptions import ProgrammingError from clickhouse_connect.driver.ctypes import data_conv, numpy_conv from clickhouse_connect.driver.insert import InsertContext @@ -32,7 +32,7 @@ class Date(ClickHouseType): return data_conv.read_date_col(source, num_rows) def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): - first = self._first_value(column) + first = first_value(column, self.nullable) if isinstance(first, int) or self.write_format(ctx) == 'int': if self.nullable: column = [x if x else 0 for x in column] @@ -45,7 +45,7 @@ class Date(ClickHouseType): column = [0 if x is None else (x - esd).days for x in column] else: column = [(x - esd).days for x in column] - write_array(self._array_type, column, dest) + write_array(self._array_type, column, dest, ctx.column_name) def _active_null(self, ctx: QueryContext): fmt = self.read_format(ctx) @@ -127,7 +127,7 @@ class DateTime(DateTimeBase): return data_conv.read_datetime_col(source, num_rows, active_tz) def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): - first = self._first_value(column) + first = first_value(column, self.nullable) if isinstance(first, int) or self.write_format(ctx) == 'int': if self.nullable: column = [x if x else 0 for x in column] @@ -136,7 +136,7 @@ class DateTime(DateTimeBase): column = [int(x.timestamp()) if x else 0 for x in column] else: column = [int(x.timestamp()) for x in column] - write_array(self._array_type, column, dest) + write_array(self._array_type, column, dest, ctx.column_name) class DateTime64(DateTimeBase): @@ -202,7 +202,7 @@ class DateTime64(DateTimeBase): return new_col def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): - first = self._first_value(column) + first = first_value(column, self.nullable) if isinstance(first, int) or self.write_format(ctx) == 'int': if self.nullable: column = [x if x else 0 for x in column] @@ -213,4 +213,4 @@ class DateTime64(DateTimeBase): for x in column] else: column = [((int(x.timestamp()) * 1000000 + x.microsecond) * prec) // 1000000 for x in column] - write_array('q', column, dest) + write_array('q', column, dest, ctx.column_name) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py index 27e16733d8..fe3e4b3fbc 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py @@ -84,7 +84,7 @@ def create_client(*, database = database or parsed.path for k, v in parse_qs(parsed.query).items(): kwargs[k] = v[0] - use_tls = str(secure).lower() == 'true' or interface == 'https' or (not interface and port in (443, 8443)) + use_tls = str(secure).lower() == 'true' or interface == 'https' or (not interface and str(port) in ('443', '8443')) if not host: host = 'localhost' if not interface: diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py index e2dc5b0118..bf56c9783f 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py @@ -13,7 +13,7 @@ from clickhouse_connect.datatypes.base import ClickHouseType from clickhouse_connect.driver.insert import InsertContext -# pylint: disable=too-many-public-methods, too-many-instance-attributes, too-many-arguments, too-many-locals +# pylint: disable=too-many-public-methods,too-many-instance-attributes,too-many-arguments,too-many-positional-arguments,too-many-locals class AsyncClient: """ AsyncClient is a wrapper around the ClickHouse Client object that allows for async calls to the ClickHouse server. diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/binding.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/binding.py new file mode 100644 index 0000000000..24522b0880 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/binding.py @@ -0,0 +1,199 @@ +import ipaddress +import re +import uuid +from datetime import tzinfo, datetime, date +from enum import Enum +from typing import Optional, Union, Sequence, Dict, Any, Tuple + +import pytz + +from clickhouse_connect import common +from clickhouse_connect.driver.common import dict_copy +from clickhouse_connect.json_impl import any_to_json + +BS = '\\' +must_escape = (BS, '\'', '`', '\t', '\n') +external_bind_re = re.compile(r'{.+:.+}') + + +class DT64Param: + def __init__(self, value: datetime): + self.value = value + + def format(self, tz: tzinfo, top_level:bool) -> str: + value = self.value + if tz: + value = value.astimezone(tz) + s = value.strftime('%Y-%m-%d %H:%M:%S.%f') + if top_level: + return s + return f"'{s}'" + + +def quote_identifier(identifier: str): + first_char = identifier[0] + if first_char in ('`', '"') and identifier[-1] == first_char: + # Identifier is already quoted, assume that it's valid + return identifier + return f'`{escape_str(identifier)}`' + + +def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], + server_tz: Optional[tzinfo] = None) -> str: + while query.endswith(';'): + query = query[:-1] + if not parameters: + return query + if hasattr(parameters, 'items'): + return query % {k: format_query_value(v, server_tz) for k, v in parameters.items()} + return query % tuple(format_query_value(v, server_tz) for v in parameters) + + +# pylint: disable=too-many-locals,too-many-branches +def bind_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], + server_tz: Optional[tzinfo] = None) -> Tuple[str, Dict[str, str]]: + while query.endswith(';'): + query = query[:-1] + if not parameters: + return query, {} + + binary_binds = None + + if isinstance(parameters, dict): + params_copy = dict_copy(parameters) + binary_binds = {k: v for k, v in params_copy.items() if k.startswith('$') and k.endswith('$') and len(k) > 1} + for key in binary_binds.keys(): + del params_copy[key] + + final_params = {} + for k, v in params_copy.items(): + if k.endswith('_64'): + if isinstance(v, datetime): + k = k[:-3] + v = DT64Param(v) + elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], datetime): + k = k[:-3] + v = [DT64Param(x) for x in v] + final_params[k] = v + if external_bind_re.search(query) is None: + query, bound_params = finalize_query(query, final_params, server_tz), {} + else: + bound_params = {f'param_{k}': format_bind_value(v, server_tz) for k, v in final_params.items()} + else: + query, bound_params = finalize_query(query, parameters, server_tz), {} + if binary_binds: + binary_query = query.encode() + binary_indexes = {} + for k, v in binary_binds.items(): + key = k.encode() + item_index = 0 + while True: + item_index = binary_query.find(key, item_index) + if item_index == -1: + break + binary_indexes[item_index + len(key)] = key, v + item_index += len(key) + query = b'' + start = 0 + for loc in sorted(binary_indexes.keys()): + key, value = binary_indexes[loc] + query += binary_query[start:loc] + value + key + start = loc + query += binary_query[start:] + return query, bound_params + + +def format_str(value: str): + return f"'{escape_str(value)}'" + + +def escape_str(value: str): + return ''.join(f'{BS}{c}' if c in must_escape else c for c in value) + + +# pylint: disable=too-many-return-statements +def format_query_value(value: Any, server_tz: tzinfo = pytz.UTC): + """ + Format Python values in a ClickHouse query + :param value: Python object + :param server_tz: Server timezone for adjusting datetime values + :return: Literal string for python value + """ + if value is None: + return 'NULL' + if isinstance(value, str): + return format_str(value) + if isinstance(value, DT64Param): + return value.format(server_tz, False) + if isinstance(value, datetime): + if value.tzinfo is not None or server_tz != pytz.UTC: + value = value.astimezone(server_tz) + return f"'{value.strftime('%Y-%m-%d %H:%M:%S')}'" + if isinstance(value, date): + return f"'{value.isoformat()}'" + if isinstance(value, list): + return f"[{', '.join(str_query_value(x, server_tz) for x in value)}]" + if isinstance(value, tuple): + return f"({', '.join(str_query_value(x, server_tz) for x in value)})" + if isinstance(value, dict): + if common.get_setting('dict_parameter_format') == 'json': + return format_str(any_to_json(value).decode()) + pairs = [str_query_value(k, server_tz) + ':' + str_query_value(v, server_tz) + for k, v in value.items()] + return f"{{{', '.join(pairs)}}}" + if isinstance(value, Enum): + return format_query_value(value.value, server_tz) + if isinstance(value, (uuid.UUID, ipaddress.IPv4Address, ipaddress.IPv6Address)): + return f"'{value}'" + return value + + +def str_query_value(value: Any, server_tz: tzinfo = pytz.UTC): + return str(format_query_value(value, server_tz)) + + +# pylint: disable=too-many-branches +def format_bind_value(value: Any, server_tz: tzinfo = pytz.UTC, top_level: bool = True): + """ + Format Python values in a ClickHouse query + :param value: Python object + :param server_tz: Server timezone for adjusting datetime values + :param top_level: Flag for top level for nested structures + :return: Literal string for python value + """ + + def recurse(x): + return format_bind_value(x, server_tz, False) + + if value is None: + return '\\N' + if isinstance(value, str): + if top_level: + # At the top levels, strings must not be surrounded by quotes + return escape_str(value) + return format_str(value) + if isinstance(value, DT64Param): + return value.format(server_tz, top_level) + if isinstance(value, datetime): + value = value.astimezone(server_tz) + val = value.strftime('%Y-%m-%d %H:%M:%S') + if top_level: + return val + return f"'{val}'" + if isinstance(value, date): + if top_level: + return value.isoformat() + return f"'{value.isoformat()}'" + if isinstance(value, list): + return f"[{', '.join(recurse(x) for x in value)}]" + if isinstance(value, tuple): + return f"({', '.join(recurse(x) for x in value)})" + if isinstance(value, dict): + if common.get_setting('dict_parameter_format') == 'json': + return any_to_json(value).decode() + pairs = [recurse(k) + ':' + recurse(v) + for k, v in value.items()] + return f"{{{', '.join(pairs)}}}" + if isinstance(value, Enum): + return recurse(value.value) + return str(value) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py index ba3873cbd8..fe11c27883 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py @@ -20,15 +20,15 @@ from clickhouse_connect.driver.external import ExternalData from clickhouse_connect.driver.insert import InsertContext from clickhouse_connect.driver.summary import QuerySummary from clickhouse_connect.driver.models import ColumnDef, SettingDef, SettingStatus -from clickhouse_connect.driver.query import QueryResult, to_arrow, to_arrow_batches, QueryContext, arrow_buffer, \ - quote_identifier +from clickhouse_connect.driver.query import QueryResult, to_arrow, to_arrow_batches, QueryContext, arrow_buffer +from clickhouse_connect.driver.binding import quote_identifier io.DEFAULT_BUFFER_SIZE = 1024 * 256 logger = logging.getLogger(__name__) arrow_str_setting = 'output_format_arrow_string_as_string' -# pylint: disable=too-many-public-methods, too-many-instance-attributes +# pylint: disable=too-many-public-methods,too-many-arguments,too-many-positional-arguments,too-many-instance-attributes class Client(ABC): """ Base ClickHouse Connect client @@ -93,6 +93,8 @@ class Client(ABC): }) if test_data[8:16] == b'\x01\x01\x05check': self.protocol_version = PROTOCOL_VERSION_WITH_LOW_CARD + if self._setting_status('date_time_input_format').is_writable: + self.set_client_setting('date_time_input_format', 'best_effort') self.uri = uri def _validate_settings(self, settings: Optional[Dict[str, Any]]) -> Dict[str, str]: @@ -171,7 +173,7 @@ class Client(ABC): :return: The string value of the setting, if it exists, or None """ - # pylint: disable=too-many-arguments,unused-argument,too-many-locals + # pylint: disable=unused-argument,too-many-locals def query(self, query: Optional[str] = None, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, @@ -303,7 +305,7 @@ class Client(ABC): :return: io.IOBase stream/iterator for the result """ - # pylint: disable=duplicate-code,too-many-arguments,unused-argument + # pylint: disable=duplicate-code,unused-argument def query_np(self, query: Optional[str] = None, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, @@ -341,7 +343,7 @@ class Client(ABC): """ return self._context_query(locals(), use_numpy=True, streaming=True).np_stream - # pylint: disable=duplicate-code,too-many-arguments,unused-argument + # pylint: disable=duplicate-code,unused-argument def query_df(self, query: Optional[str] = None, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, @@ -364,7 +366,7 @@ class Client(ABC): """ return self._context_query(locals(), use_numpy=True, as_pandas=True).df_result - # pylint: disable=duplicate-code,too-many-arguments,unused-argument + # pylint: disable=duplicate-code,unused-argument def query_df_stream(self, query: Optional[str] = None, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, @@ -571,7 +573,6 @@ class Client(ABC): :return: ClickHouse server is up and reachable """ - # pylint: disable=too-many-arguments def insert(self, table: Optional[str] = None, data: Sequence[Sequence[Any]] = None, @@ -777,11 +778,18 @@ class Client(ABC): :param fmt: Valid clickhouse format """ + @abstractmethod def close(self): """ Subclass implementation to close the connection to the server/deallocate the client """ + @abstractmethod + def close_connections(self): + """ + Subclass implementation to disconnect all "re-used" client connections + """ + def _context_query(self, lcls: dict, **overrides): kwargs = lcls.copy() kwargs.pop('self') diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py index dca0dc9317..404a09e867 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py @@ -7,6 +7,7 @@ from typing import Sequence, MutableSequence, Dict, Optional, Union, Generator from clickhouse_connect.driver.exceptions import ProgrammingError, StreamClosedError, DataError from clickhouse_connect.driver.types import Closable + # pylint: disable=invalid-name must_swap = sys.byteorder == 'big' int_size = array.array('i').itemsize @@ -38,12 +39,13 @@ def array_type(size: int, signed: bool): return code if signed else code.upper() -def write_array(code: str, column: Sequence, dest: MutableSequence): +def write_array(code: str, column: Sequence, dest: MutableSequence, col_name: Optional[str]=None): """ Write a column of native Python data matching the array.array code :param code: Python array.array code matching the column data type :param column: Column of native Python values :param dest: Destination byte buffer + :param col_name: Optional column name for error tracking """ if len(column) and not isinstance(column[0], (int, float)): if code in ('f', 'F', 'd', 'D'): @@ -54,8 +56,11 @@ def write_array(code: str, column: Sequence, dest: MutableSequence): buff = struct.Struct(f'<{len(column)}{code}') dest += buff.pack(*column) except (TypeError, OverflowError, struct.error) as ex: - raise DataError('Unable to create Python array. This is usually caused by trying to insert None ' + - 'values into a ClickHouse column that is not Nullable') from ex + col_msg = '' + if col_name: + col_msg = f' for source column `{col_name}`' + raise DataError(f'Unable to create Python array{col_msg}. This is usually caused by trying to insert None ' + + 'values into a ClickHouse column that is not Nullable') from ex def write_uint64(value: int, dest: MutableSequence): @@ -134,6 +139,14 @@ def coerce_bool(val: Optional[Union[str, bool]]): return val is True or (isinstance(val, str) and val.lower() in ('true', '1', 'y', 'yes')) +def first_value(column: Sequence, nullable:bool = True): + if nullable: + return next((x for x in column if x is not None), None) + if len(column): + return column[0] + return None + + class SliceView(Sequence): """ Provides a view into a sequence rather than copying. Borrows liberally from diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py index ad2b0d38d1..00125a0bdd 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py @@ -36,6 +36,7 @@ class BaseQueryContext: for type_name, fmt in fmt.items()} self.query_formats = query_formats or {} self.column_formats = column_formats or {} + self.column_name = None self.encoding = encoding self.use_numpy = use_numpy self.use_extended_dtypes = use_extended_dtypes @@ -43,6 +44,7 @@ class BaseQueryContext: self._active_col_type_fmts = _empty_map def start_column(self, name: str): + self.column_name = name self._active_col_fmt = self.col_simple_formats.get(name) self._active_col_type_fmts = self.col_type_formats.get(name, _empty_map) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py index e7bb607e68..39e5954c1f 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py @@ -46,4 +46,4 @@ def connect_numpy(): str(ex)) -connect_c_modules() +# connect_c_modules() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/errors.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/errors.py index 0d37890b7e..e5b3b6466e 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/errors.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/errors.py @@ -1,3 +1,4 @@ +from clickhouse_connect.driver.context import BaseQueryContext from clickhouse_connect.driver.exceptions import DataError @@ -8,6 +9,9 @@ NONE_IN_NULLABLE_COLUMN = 1 error_messages = {NONE_IN_NULLABLE_COLUMN: 'Invalid None value in non-Nullable column'} -def handle_error(error_num: int): +def handle_error(error_num: int, ctx: BaseQueryContext): if error_num > 0: - raise DataError(error_messages[error_num]) + msg = error_messages[error_num] + if ctx.column_name: + msg = f'{msg}, column name: `{ctx.column_name}`' + raise DataError(msg) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py index bf3c0d863f..001c578b26 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py @@ -24,7 +24,8 @@ from clickhouse_connect.driver.external import ExternalData from clickhouse_connect.driver.httputil import ResponseSource, get_pool_manager, get_response_data, \ default_pool_manager, get_proxy_manager, all_managers, check_env_proxy, check_conn_expiration from clickhouse_connect.driver.insert import InsertContext -from clickhouse_connect.driver.query import QueryResult, QueryContext, quote_identifier, bind_query +from clickhouse_connect.driver.query import QueryResult, QueryContext +from clickhouse_connect.driver.binding import quote_identifier, bind_query from clickhouse_connect.driver.summary import QuerySummary from clickhouse_connect.driver.transform import NativeTransform @@ -44,7 +45,7 @@ class HttpClient(Client): 'enable_http_compression'} _owns_pool_manager = False - # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements,unused-argument + # pylint: disable=too-many-positional-arguments,too-many-arguments,too-many-locals,too-many-branches,too-many-statements,unused-argument def __init__(self, interface: str, host: str, @@ -70,7 +71,8 @@ class HttpClient(Client): server_host_name: Optional[str] = None, apply_server_timezone: Optional[Union[str, bool]] = None, show_clickhouse_errors: Optional[bool] = None, - autogenerate_session_id: Optional[bool] = None): + autogenerate_session_id: Optional[bool] = None, + tls_mode: Optional[str] = None): """ Create an HTTP ClickHouse Connect client See clickhouse_connect.get_client for parameters @@ -80,20 +82,20 @@ class HttpClient(Client): ch_settings = dict_copy(settings, self.params) self.http = pool_mgr if interface == 'https': + if isinstance(verify, str) and verify.lower() == 'proxy': + verify = True + tls_mode = tls_mode or 'proxy' if not https_proxy: https_proxy = check_env_proxy('https', host, port) - if https_proxy and isinstance(verify, str) and verify.lower() == 'proxy': - verify = 'proxy' - else: - verify = coerce_bool(verify) - if client_cert and verify != 'proxy': + verify = coerce_bool(verify) + if client_cert and (tls_mode is None or tls_mode == 'mutual'): if not username: raise ProgrammingError('username parameter is required for Mutual TLS authentication') self.headers['X-ClickHouse-User'] = username self.headers['X-ClickHouse-SSL-Certificate-Auth'] = 'on' # pylint: disable=too-many-boolean-expressions if not self.http and (server_host_name or ca_cert or client_cert or not verify or https_proxy): - options = {'verify': verify is not False} + options = {'verify': verify} dict_add(options, 'ca_cert', ca_cert) dict_add(options, 'client_cert', client_cert) dict_add(options, 'client_cert_key', client_cert_key) @@ -111,7 +113,7 @@ class HttpClient(Client): else: self.http = default_pool_manager() - if (not client_cert or verify == 'proxy') and username: + if (not client_cert or tls_mode in ('strict', 'proxy')) and username: self.headers['Authorization'] = 'Basic ' + b64encode(f'{username}:{password}'.encode()).decode() self.headers['User-Agent'] = common.build_client_name(client_name) self._read_format = self._write_format = 'Native' @@ -157,7 +159,7 @@ class HttpClient(Client): server_host_name=server_host_name, apply_server_timezone=apply_server_timezone, show_clickhouse_errors=show_clickhouse_errors) - self.params = self._validate_settings(ch_settings) + self.params = dict_copy(self.params, self._validate_settings(ch_settings)) comp_setting = self._setting_status('enable_http_compression') self._send_comp_setting = not comp_setting.is_set and comp_setting.is_writable if comp_setting.is_set or comp_setting.is_writable: @@ -195,7 +197,7 @@ class HttpClient(Client): context.block_info = True params.update(context.bind_params) params.update(self._validate_settings(context.settings)) - if columns_only_re.search(context.uncommented_query): + if not context.is_insert and columns_only_re.search(context.uncommented_query): response = self._raw_request(f'{context.final_query}\n FORMAT JSON', params, headers, retries=self.query_retries) json_result = json.loads(response.data) @@ -513,6 +515,9 @@ class HttpClient(Client): logger.debug('ping failed', exc_info=True) return False + def close_connections(self): + self.http.clear() + def close(self): if self._owns_pool_manager: self.http.clear() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py index 9a2b835c65..7dd73114e9 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py @@ -6,7 +6,8 @@ import os import sys import socket import time -from typing import Dict, Any, Optional +from collections import deque +from typing import Dict, Any, Optional, Tuple, Callable import certifi import lz4.frame @@ -192,34 +193,56 @@ class ResponseSource: def __init__(self, response: HTTPResponse, chunk_size: int = 1024 * 1024): self.response = response compression = response.headers.get('content-encoding') + decompress:Optional[Callable] = None if compression == 'zstd': zstd_decom = zstandard.ZstdDecompressor().decompressobj() - def decompress(): - while True: - chunk = response.read(chunk_size, decode_content=False) - if not chunk: - break - yield zstd_decom.decompress(chunk) + def zstd_decompress(c: deque) -> Tuple[bytes, int]: + chunk = c.popleft() + return zstd_decom.decompress(chunk), len(chunk) - self.gen = decompress() + decompress = zstd_decompress elif compression == 'lz4': lz4_decom = lz4.frame.LZ4FrameDecompressor() - def decompress(): + def lz_decompress(c: deque) -> Tuple[Optional[bytes], int]: + read_amt = 0 while lz4_decom.needs_input: - data = self.response.read(chunk_size, decode_content=False) + data = c.popleft() + read_amt += len(data) if lz4_decom.unused_data: data = lz4_decom.unused_data + data - if not data: - return - chunk = lz4_decom.decompress(data) - if chunk: - yield chunk - - self.gen = decompress() - else: - self.gen = response.stream(amt=chunk_size, decode_content=True) + return lz4_decom.decompress(data), read_amt + return None, 0 + + decompress = lz_decompress + + buffer_size = common.get_setting('http_buffer_size') + + def buffered(): + chunks = deque() + done = False + current_size = 0 + read_gen = response.read_chunked(chunk_size, decompress is None) + while True: + while not done and current_size < buffer_size: + chunk = next(read_gen, None) + if not chunk: + done = True + break + chunks.append(chunk) + current_size += len(chunk) + if len(chunks) == 0: + return + if decompress: + chunk, used = decompress(chunks) + current_size -= used + else: + chunk = chunks.popleft() + current_size -= len(chunk) + yield chunk + + self.gen = buffered() def close(self): self.response.drain_conn() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py index 39a306cdc4..8ca1ef9f22 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py @@ -2,12 +2,12 @@ import logging from math import log from typing import Iterable, Sequence, Optional, Any, Dict, NamedTuple, Generator, Union, TYPE_CHECKING -from clickhouse_connect.driver.query import quote_identifier +from clickhouse_connect.driver.binding import quote_identifier from clickhouse_connect.driver.ctypes import data_conv from clickhouse_connect.driver.context import BaseQueryContext from clickhouse_connect.driver.options import np, pd, pd_time_test -from clickhouse_connect.driver.exceptions import ProgrammingError +from clickhouse_connect.driver.exceptions import ProgrammingError, DataError if TYPE_CHECKING: from clickhouse_connect.datatypes.base import ClickHouseType @@ -198,3 +198,6 @@ class InsertContext(BaseQueryContext): data[ix] = data[ix].tolist() self.column_oriented = True return data + + def data_error(self, error_message: str) -> DataError: + return DataError(f"Failed to write column '{self.column_name}': {error_message}") diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py index acdf9510e0..02bdc03cd5 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py @@ -142,7 +142,7 @@ def parse_columns(expr: str): pos += 1 else: if level == 0: - if char == ' ': + if char in (' ', '='): if label and not named: names.append(unescape_identifier(label)) label = '' diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py index c11e128ec6..54edbeff09 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py @@ -1,22 +1,18 @@ -import ipaddress import logging import re -import uuid import pytz -from enum import Enum from io import IOBase from typing import Any, Tuple, Dict, Sequence, Optional, Union, Generator -from datetime import date, datetime, tzinfo +from datetime import tzinfo from pytz.exceptions import UnknownTimeZoneError -from clickhouse_connect import common from clickhouse_connect.driver import tzutil +from clickhouse_connect.driver.binding import bind_query from clickhouse_connect.driver.common import dict_copy, empty_gen, StreamContext from clickhouse_connect.driver.external import ExternalData from clickhouse_connect.driver.types import Matrix, Closable -from clickhouse_connect.json_impl import any_to_json from clickhouse_connect.driver.exceptions import StreamClosedError, ProgrammingError from clickhouse_connect.driver.options import check_arrow, pd_extended_dtypes from clickhouse_connect.driver.context import BaseQueryContext @@ -29,7 +25,6 @@ limit_re = re.compile(r'\s+LIMIT($|\s)', re.IGNORECASE) select_re = re.compile(r'(^|\s)SELECT\s', re.IGNORECASE) insert_re = re.compile(r'(^|\s)INSERT\s*INTO', re.IGNORECASE) command_re = re.compile(r'(^\s*)(' + commands + r')\s', re.IGNORECASE) -external_bind_re = re.compile(r'{.+:.+}') # pylint: disable=too-many-instance-attributes @@ -38,7 +33,7 @@ class QueryContext(BaseQueryContext): Argument/parameter object for queries. This context is used to set thread/query specific formats """ - # pylint: disable=duplicate-code,too-many-arguments,too-many-locals + # pylint: disable=duplicate-code,too-many-arguments,too-many-positional-arguments,too-many-locals def __init__(self, query: Union[str, bytes] = '', parameters: Optional[Dict[str, Any]] = None, @@ -176,6 +171,7 @@ class QueryContext(BaseQueryContext): return None return active_tz + # pylint disable=too-many-positional-arguments def updated_copy(self, query: Optional[Union[str, bytes]] = None, parameters: Optional[Dict[str, Any]] = None, @@ -343,159 +339,6 @@ class QueryResult(Closable): self._block_gen = None -BS = '\\' -must_escape = (BS, '\'', '`', '\t', '\n') - - -def quote_identifier(identifier: str): - first_char = identifier[0] - if first_char in ('`', '"') and identifier[-1] == first_char: - # Identifier is already quoted, assume that it's valid - return identifier - return f'`{escape_str(identifier)}`' - - -def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], - server_tz: Optional[tzinfo] = None) -> str: - while query.endswith(';'): - query = query[:-1] - if not parameters: - return query - if hasattr(parameters, 'items'): - return query % {k: format_query_value(v, server_tz) for k, v in parameters.items()} - return query % tuple(format_query_value(v) for v in parameters) - - -def bind_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], - server_tz: Optional[tzinfo] = None) -> Tuple[str, Dict[str, str]]: - while query.endswith(';'): - query = query[:-1] - if not parameters: - return query, {} - - binary_binds = None - if isinstance(parameters, dict): - binary_binds = {k: v for k, v in parameters.items() if k.startswith('$') and k.endswith('$') and len(k) > 1} - for key in binary_binds.keys(): - del parameters[key] - if external_bind_re.search(query) is None: - query, bound_params = finalize_query(query, parameters, server_tz), {} - else: - bound_params = {f'param_{k}': format_bind_value(v, server_tz) for k, v in parameters.items()} - if binary_binds: - binary_query = query.encode() - binary_indexes = {} - for k, v in binary_binds.items(): - key = k.encode() - item_index = 0 - while True: - item_index = binary_query.find(key, item_index) - if item_index == -1: - break - binary_indexes[item_index + len(key)] = key, v - item_index += len(key) - query = b'' - start = 0 - for loc in sorted(binary_indexes.keys()): - key, value = binary_indexes[loc] - query += binary_query[start:loc] + value + key - start = loc - query += binary_query[start:] - return query, bound_params - - -def format_str(value: str): - return f"'{escape_str(value)}'" - - -def escape_str(value: str): - return ''.join(f'{BS}{c}' if c in must_escape else c for c in value) - - -# pylint: disable=too-many-return-statements -def format_query_value(value: Any, server_tz: tzinfo = pytz.UTC): - """ - Format Python values in a ClickHouse query - :param value: Python object - :param server_tz: Server timezone for adjusting datetime values - :return: Literal string for python value - """ - if value is None: - return 'NULL' - if isinstance(value, str): - return format_str(value) - if isinstance(value, datetime): - if value.tzinfo is not None or server_tz != pytz.UTC: - value = value.astimezone(server_tz) - return f"'{value.strftime('%Y-%m-%d %H:%M:%S')}'" - if isinstance(value, date): - return f"'{value.isoformat()}'" - if isinstance(value, list): - return f"[{', '.join(str_query_value(x, server_tz) for x in value)}]" - if isinstance(value, tuple): - return f"({', '.join(str_query_value(x, server_tz) for x in value)})" - if isinstance(value, dict): - if common.get_setting('dict_parameter_format') == 'json': - return format_str(any_to_json(value).decode()) - pairs = [str_query_value(k, server_tz) + ':' + str_query_value(v, server_tz) - for k, v in value.items()] - return f"{{{', '.join(pairs)}}}" - if isinstance(value, Enum): - return format_query_value(value.value, server_tz) - if isinstance(value, (uuid.UUID, ipaddress.IPv4Address, ipaddress.IPv6Address)): - return f"'{value}'" - return value - - -def str_query_value(value: Any, server_tz: tzinfo = pytz.UTC): - return str(format_query_value(value, server_tz)) - - -# pylint: disable=too-many-branches -def format_bind_value(value: Any, server_tz: tzinfo = pytz.UTC, top_level: bool = True): - """ - Format Python values in a ClickHouse query - :param value: Python object - :param server_tz: Server timezone for adjusting datetime values - :param top_level: Flag for top level for nested structures - :return: Literal string for python value - """ - - def recurse(x): - return format_bind_value(x, server_tz, False) - - if value is None: - return '\\N' - if isinstance(value, str): - if top_level: - # At the top levels, strings must not be surrounded by quotes - return escape_str(value) - return format_str(value) - if isinstance(value, datetime): - value = value.astimezone(server_tz) - val = value.strftime('%Y-%m-%d %H:%M:%S') - if top_level: - return val - return f"'{val}'" - if isinstance(value, date): - if top_level: - return value.isoformat() - return f"'{value.isoformat()}'" - if isinstance(value, list): - return f"[{', '.join(recurse(x) for x in value)}]" - if isinstance(value, tuple): - return f"({', '.join(recurse(x) for x in value)})" - if isinstance(value, dict): - if common.get_setting('dict_parameter_format') == 'json': - return any_to_json(value).decode() - pairs = [recurse(k) + ':' + recurse(v) - for k, v in value.items()] - return f"{{{', '.join(pairs)}}}" - if isinstance(value, Enum): - return recurse(value.value) - return str(value) - - comment_re = re.compile(r"(\".*?\"|\'.*?\')|(/\*.*?\*/|(--\s)[^\n]*$)", re.MULTILINE | re.DOTALL) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py index 54b4b45949..42480858d6 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py @@ -2,7 +2,7 @@ from typing import Optional, Sequence, Dict, Any from clickhouse_connect.driver import Client from clickhouse_connect.driver.summary import QuerySummary -from clickhouse_connect.driver.query import quote_identifier +from clickhouse_connect.driver.binding import quote_identifier def insert_file(client: Client, diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py index 03206d0c85..1d20d9c6e7 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py @@ -96,7 +96,7 @@ class NativeTransform: col_enc = col_name.encode() write_leb128(len(col_enc), output) output += col_enc - col_enc = col_type.name.encode() + col_enc = col_type.insert_name.encode() write_leb128(len(col_enc), output) output += col_enc context.start_column(col_name) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/tools/testing.py b/contrib/python/clickhouse-connect/clickhouse_connect/tools/testing.py index 7084c71a40..56ced72501 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/tools/testing.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/tools/testing.py @@ -1,7 +1,7 @@ from typing import Sequence, Optional, Union, Dict, Any from clickhouse_connect.driver import Client -from clickhouse_connect.driver.query import quote_identifier, str_query_value +from clickhouse_connect.driver.binding import quote_identifier, str_query_value class TableContext: |