aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/clickhouse-connect/clickhouse_connect
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-10-12 10:39:22 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-10-12 10:50:34 +0300
commitb904316f22d889171e7fe39e8c467cbea5c6cdd6 (patch)
tree84c870d76f7278ad6a8d1c020a8b950edb7d531e /contrib/python/clickhouse-connect/clickhouse_connect
parente493167a2cecbdc68258d70ddb044e7a0f56aa7f (diff)
downloadydb-b904316f22d889171e7fe39e8c467cbea5c6cdd6.tar.gz
Intermediate changes
commit_hash:3ed72e620c7eace6c8edd510ac2324e8acfcfafb
Diffstat (limited to 'contrib/python/clickhouse-connect/clickhouse_connect')
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/__version__.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/base.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/custom.py4
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/__init__.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/preparer.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/common.py3
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/datatypes/__init__.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py28
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py88
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/datatypes/dynamic.py257
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py8
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py26
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/datatypes/postinit.py4
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/datatypes/registry.py6
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py7
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py16
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py14
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/binding.py199
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py24
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py19
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/errors.py8
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py29
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py61
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py7
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py165
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/tools/testing.py2
35 files changed, 658 insertions, 345 deletions
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py
index d9d76edb88..c5e4522a93 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py
@@ -1 +1 @@
-version = '0.7.19'
+version = '0.8.0'
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/base.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/base.py
index 0c0d25e6b0..ce73da1481 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/base.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/base.py
@@ -5,7 +5,7 @@ from sqlalchemy.exc import CompileError
from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef, EMPTY_TYPE_DEF
from clickhouse_connect.datatypes.registry import parse_name, type_map
-from clickhouse_connect.driver.query import str_query_value
+from clickhouse_connect.driver.binding import str_query_value
logger = logging.getLogger(__name__)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/custom.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/custom.py
index b7eee4ad7d..9f7c1e6401 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/custom.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/custom.py
@@ -1,7 +1,7 @@
from sqlalchemy.sql.ddl import DDL
from sqlalchemy.exc import ArgumentError
-from clickhouse_connect.driver.query import quote_identifier
+from clickhouse_connect.driver.binding import quote_identifier
# pylint: disable=too-many-ancestors,abstract-method
@@ -31,7 +31,7 @@ class CreateDatabase(DDL):
super().__init__(stmt)
-# pylint: disable=too-many-ancestors,abstract-method
+# pylint: disable=too-many-ancestors,abstract-method
class DropDatabase(DDL):
"""
Alternative DDL statement for built in SqlAlchemy DropSchema DDL class
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py
index 6b04c7e2fe..c9bb24c95a 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py
@@ -8,7 +8,7 @@ from clickhouse_connect.cc_sqlalchemy.sql import full_table
from clickhouse_connect.cc_sqlalchemy.sql.ddlcompiler import ChDDLCompiler
from clickhouse_connect.cc_sqlalchemy import ischema_names, dialect_name
from clickhouse_connect.cc_sqlalchemy.sql.preparer import ChIdentifierPreparer
-from clickhouse_connect.driver.query import quote_identifier, format_str
+from clickhouse_connect.driver.binding import quote_identifier, format_str
# pylint: disable=too-many-public-methods,no-self-use,unused-argument
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/__init__.py
index 68becd54d6..00b9bc8c13 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/__init__.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/__init__.py
@@ -2,7 +2,7 @@ from typing import Optional
from sqlalchemy import Table
-from clickhouse_connect.driver.query import quote_identifier
+from clickhouse_connect.driver.binding import quote_identifier
def full_table(table_name: str, schema: Optional[str] = None) -> str:
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py
index 5a97254705..8a2180c466 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py
@@ -2,7 +2,7 @@ from sqlalchemy import Column
from sqlalchemy.sql.compiler import DDLCompiler
from clickhouse_connect.cc_sqlalchemy.sql import format_table
-from clickhouse_connect.driver.query import quote_identifier
+from clickhouse_connect.driver.binding import quote_identifier
class ChDDLCompiler(DDLCompiler):
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/preparer.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/preparer.py
index a31b3e7af6..f53a2bde37 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/preparer.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/preparer.py
@@ -1,6 +1,6 @@
from sqlalchemy.sql.compiler import IdentifierPreparer
-from clickhouse_connect.driver.query import quote_identifier
+from clickhouse_connect.driver.binding import quote_identifier
class ChIdentifierPreparer(IdentifierPreparer):
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/common.py b/contrib/python/clickhouse-connect/clickhouse_connect/common.py
index 1fda1ecfd7..ab960e76b7 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/common.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/common.py
@@ -81,3 +81,6 @@ _init_common('send_os_user', (True, False), True)
_init_common('use_protocol_version', (True, False), True)
_init_common('max_error_size', (), 1024)
+
+# HTTP raw data buffer for streaming queries. This should not be reduced below 64KB to ensure compatibility with LZ4 compression
+_init_common('http_buffer_size', (), 10 * 1024 * 1024)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/__init__.py
index aa9b8c2f5d..e4229f66c3 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/__init__.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/__init__.py
@@ -4,4 +4,6 @@ import clickhouse_connect.datatypes.numeric
import clickhouse_connect.datatypes.special
import clickhouse_connect.datatypes.string
import clickhouse_connect.datatypes.temporal
+import clickhouse_connect.datatypes.dynamic
import clickhouse_connect.datatypes.registry
+import clickhouse_connect.datatypes.postinit
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py
index b1990280eb..ab43b4e1db 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py
@@ -3,7 +3,7 @@ import logging
from abc import ABC
from math import log
-from typing import NamedTuple, Dict, Type, Any, Sequence, MutableSequence, Optional, Union, Collection
+from typing import NamedTuple, Dict, Type, Any, Sequence, MutableSequence, Union, Collection
from clickhouse_connect.driver.common import array_type, int_size, write_array, write_uint64, low_card_version
from clickhouse_connect.driver.context import BaseQueryContext
@@ -94,6 +94,10 @@ class ClickHouseType(ABC):
name = f'{wrapper}({name})'
return name
+ @property
+ def insert_name(self):
+ return self.name
+
def data_size(self, sample: Sequence) -> int:
if self.low_card:
values = set(sample)
@@ -104,10 +108,13 @@ class ClickHouseType(ABC):
d_size += 1
return d_size
- def _data_size(self, _sample: Collection) -> int:
+ def _data_size(self, sample: Collection) -> int:
if self.byte_size:
return self.byte_size
- return 0
+ total = 0
+ for x in sample:
+ total += len(str(x))
+ return total / len(sample) + 1
def write_column_prefix(self, dest: bytearray):
"""
@@ -119,7 +126,7 @@ class ClickHouseType(ABC):
if self.low_card:
write_uint64(low_card_version, dest)
- def read_column_prefix(self, source: ByteSource):
+ def read_column_prefix(self, source: ByteSource, _ctx: QueryContext):
"""
Read the low cardinality version. Like the write method, this has to happen immediately for container classes
:param source: The native protocol binary read buffer
@@ -139,7 +146,7 @@ class ClickHouseType(ABC):
:param ctx: QueryContext for query specific settings
:return: The decoded column data as a sequence and the updated location pointer
"""
- self.read_column_prefix(source)
+ self.read_column_prefix(source, ctx)
return self.read_column_data(source, num_rows, ctx)
def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence:
@@ -274,18 +281,11 @@ class ClickHouseType(ABC):
write_uint64(len(index), dest)
self._write_column_binary(index, dest, ctx)
write_uint64(len(keys), dest)
- write_array(array_type(1 << ix_type, False), keys, dest)
+ write_array(array_type(1 << ix_type, False), keys, dest, ctx.column_name)
def _active_null(self, _ctx: QueryContext) -> Any:
return None
- def _first_value(self, column: Sequence) -> Optional[Any]:
- if self.nullable:
- return next((x for x in column if x is not None), None)
- if len(column):
- return column[0]
- return None
-
EMPTY_TYPE_DEF = TypeDef()
NULLABLE_TYPE_DEF = TypeDef(wrappers=('Nullable',))
@@ -338,7 +338,7 @@ class ArrayType(ClickHouseType, ABC, registered=False):
def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext):
if len(column) and self.nullable:
column = [0 if x is None else x for x in column]
- write_array(self._array_type, column, dest)
+ write_array(self._array_type, column, dest, ctx.column_name)
def _active_null(self, ctx: QueryContext):
if ctx.as_pandas and ctx.use_extended_dtypes:
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py
index 445b24140d..b8244eefdb 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py
@@ -3,11 +3,12 @@ import logging
from typing import Sequence, Collection
from clickhouse_connect.driver.insert import InsertContext
-from clickhouse_connect.driver.query import QueryContext, quote_identifier
+from clickhouse_connect.driver.query import QueryContext
+from clickhouse_connect.driver.binding import quote_identifier
from clickhouse_connect.driver.types import ByteSource
from clickhouse_connect.json_impl import any_to_json
from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef
-from clickhouse_connect.driver.common import must_swap
+from clickhouse_connect.driver.common import must_swap, first_value
from clickhouse_connect.datatypes.registry import get_from_name
logger = logging.getLogger(__name__)
@@ -22,8 +23,8 @@ class Array(ClickHouseType):
self.element_type = get_from_name(type_def.values[0])
self._name_suffix = f'({self.element_type.name})'
- def read_column_prefix(self, source: ByteSource):
- return self.element_type.read_column_prefix(source)
+ def read_column_prefix(self, source: ByteSource, ctx:QueryContext):
+ return self.element_type.read_column_prefix(source, ctx)
def _data_size(self, sample: Sequence) -> int:
if len(sample) == 0:
@@ -102,7 +103,7 @@ class Tuple(ClickHouseType):
if len(sample) == 0:
return 0
elem_size = 0
- is_dict = self.element_names and isinstance(self._first_value(list(sample)), dict)
+ is_dict = self.element_names and isinstance(first_value(list(sample), self.nullable), dict)
for ix, e_type in enumerate(self.element_types):
if e_type.byte_size > 0:
elem_size += e_type.byte_size
@@ -112,9 +113,9 @@ class Tuple(ClickHouseType):
elem_size += e_type.data_size([x[ix] for x in sample])
return elem_size
- def read_column_prefix(self, source: ByteSource):
+ def read_column_prefix(self, source: ByteSource, ctx: QueryContext):
for e_type in self.element_types:
- e_type.read_column_prefix(source)
+ e_type.read_column_prefix(source, ctx)
def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext):
columns = []
@@ -138,7 +139,7 @@ class Tuple(ClickHouseType):
e_type.write_column_prefix(dest)
def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext):
- if self.element_names and isinstance(self._first_value(column), dict):
+ if self.element_names and isinstance(first_value(column, self.nullable), dict):
columns = self.convert_dict_insert(column)
else:
columns = list(zip(*column))
@@ -180,9 +181,9 @@ class Map(ClickHouseType):
total += self.value_type.data_size(x.values())
return total // len(sample)
- def read_column_prefix(self, source: ByteSource):
- self.key_type.read_column_prefix(source)
- self.value_type.read_column_prefix(source)
+ def read_column_prefix(self, source: ByteSource, ctx: QueryContext):
+ self.key_type.read_column_prefix(source, ctx)
+ self.value_type.read_column_prefix(source, ctx)
# pylint: disable=too-many-locals
def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext):
@@ -237,8 +238,8 @@ class Nested(ClickHouseType):
array_sample = [[tuple(sub_row[key] for key in keys) for sub_row in row] for row in sample]
return self.tuple_array.data_size(array_sample)
- def read_column_prefix(self, source: ByteSource):
- self.tuple_array.read_column_prefix(source)
+ def read_column_prefix(self, source: ByteSource, ctx:QueryContext):
+ self.tuple_array.read_column_prefix(source, ctx)
def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext):
keys = self.element_names
@@ -252,64 +253,3 @@ class Nested(ClickHouseType):
keys = self.element_names
data = [[tuple(sub_row[key] for key in keys) for sub_row in row] for row in column]
self.tuple_array.write_column_data(data, dest, ctx)
-
-
-class JSON(ClickHouseType):
- python_type = dict
- # Native is a Python type (primitive, dict, array), string is an actual JSON string
- valid_formats = 'string', 'native'
-
- def write_column_prefix(self, dest: bytearray):
- dest.append(0x01)
-
- def _data_size(self, sample: Collection) -> int:
- if len(sample) == 0:
- return 0
- total = 0
- for x in sample:
- if isinstance(x, str):
- total += len(x)
- elif x:
- total += len(any_to_json(x))
- return total // len(sample) + 1
-
- # pylint: disable=duplicate-code
- def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext):
- app = dest.append
- first = self._first_value(column)
- if isinstance(first, str) or self.write_format(ctx) == 'string':
- for x in column:
- v = x.encode()
- sz = len(v)
- while True:
- b = sz & 0x7f
- sz >>= 7
- if sz == 0:
- app(b)
- break
- app(0x80 | b)
- dest += v
- else:
- to_json = any_to_json
- for x in column:
- v = to_json(x)
- sz = len(v)
- while True:
- b = sz & 0x7f
- sz >>= 7
- if sz == 0:
- app(b)
- break
- app(0x80 | b)
- dest += v
-
-
-class Object(JSON):
- python_type = dict
-
- def __init__(self, type_def):
- data_type = type_def.values[0].lower().replace(' ', '')
- if data_type not in ("'json'", "nullable('json')"):
- raise NotImplementedError('Only json or Nullable(json) Object type is currently supported')
- super().__init__(type_def)
- self._name_suffix = type_def.arg_str
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/dynamic.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/dynamic.py
new file mode 100644
index 0000000000..e32145f1f8
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/dynamic.py
@@ -0,0 +1,257 @@
+from typing import List, Sequence, Collection
+
+from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef
+from clickhouse_connect.datatypes.registry import get_from_name
+from clickhouse_connect.driver.common import unescape_identifier, first_value
+from clickhouse_connect.driver.ctypes import data_conv
+from clickhouse_connect.driver.errors import handle_error
+from clickhouse_connect.driver.exceptions import DataError
+from clickhouse_connect.driver.insert import InsertContext
+from clickhouse_connect.driver.query import QueryContext
+from clickhouse_connect.driver.types import ByteSource
+from clickhouse_connect.json_impl import any_to_json
+
+SHARED_DATA_TYPE: ClickHouseType
+STRING_DATA_TYPE: ClickHouseType
+
+class Variant(ClickHouseType):
+ _slots = 'element_types'
+ python_type = object
+
+ def __init__(self, type_def: TypeDef):
+ super().__init__(type_def)
+ self.element_types:List[ClickHouseType] = [get_from_name(name) for name in type_def.values]
+ self._name_suffix = f"({', '.join(ch_type.name for ch_type in self.element_types)})"
+
+ @property
+ def insert_name(self):
+ return 'String'
+
+ def read_column_prefix(self, source: ByteSource, ctx:QueryContext):
+ if source.read_uint64() != 0:
+ raise DataError(f'Unexpected discriminator format in Variant column {ctx.column_name}')
+
+ def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence:
+ return read_variant_column(self.element_types, source, num_rows, ctx)
+
+ def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext):
+ write_str_values(self, column, dest, ctx)
+
+
+def read_variant_column(variant_types: List[ClickHouseType], source: ByteSource, num_rows:int, ctx: QueryContext) -> Sequence:
+ v_count = len(variant_types)
+ discriminators = source.read_array('B', num_rows)
+ # We have to count up how many of each discriminator there are in the block to read the sub columns correctly
+ disc_rows = [0] * v_count
+ for disc in discriminators:
+ if disc != 255:
+ disc_rows[disc] += 1
+ sub_columns: List[Sequence] = [[]] * v_count
+ # Read all the sub-columns
+ for ix in range(v_count):
+ if disc_rows[ix] > 0:
+ sub_columns[ix] = variant_types[ix].read_column_data(source, disc_rows[ix], ctx)
+ # Now we have to walk through each of the discriminators again to assign the correct value from
+ # the sub-column to the final result column
+ sub_indexes = [0] * v_count
+ col = []
+ app_col = col.append
+ for disc in discriminators:
+ if disc == 255:
+ app_col(None)
+ else:
+ app_col(sub_columns[disc][sub_indexes[disc]])
+ sub_indexes[disc] += 1
+ return col
+
+
+class Dynamic(ClickHouseType):
+ python_type = object
+
+ @property
+ def insert_name(self):
+ return 'String'
+
+ def __init__(self, type_def:TypeDef):
+ super().__init__(type_def)
+ if type_def.keys and type_def.keys[0] == 'max_types':
+ self._name_suffix = f'(max_types={type_def.values[0]})'
+
+ def read_column(self, source: ByteSource, num_rows:int, ctx:QueryContext):
+ variant_types = read_dynamic_prefix(source)
+ return read_variant_column(variant_types, source, num_rows, ctx)
+
+ def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext):
+ write_str_values(self, column, dest, ctx)
+
+
+def read_dynamic_prefix(source: ByteSource) -> List[ClickHouseType]:
+ if source.read_uint64() != 1: # dynamic structure serialization version, currently only 1 is recognized
+ raise DataError('Unrecognized dynamic structure version')
+ source.read_leb128() # max dynamic types, we ignore this value
+ num_variants = source.read_leb128()
+ variant_types = [get_from_name(source.read_leb128_str()) for _ in range(num_variants)]
+ variant_types.append(STRING_DATA_TYPE)
+ if source.read_uint64() != 0: # discriminator format, currently only 0 is recognized
+ raise DataError('Unexpected discriminator format in Variant column prefix')
+ return variant_types
+
+
+def json_sample_size(_, sample: Collection) -> int:
+ if len(sample) == 0:
+ return 0
+ total = 0
+ for x in sample:
+ if isinstance(x, str):
+ total += len(x)
+ elif x:
+ total += len(any_to_json(x))
+ return total // len(sample) + 1
+
+
+def write_json(ch_type:ClickHouseType, column: Sequence, dest: bytearray, ctx: InsertContext):
+ first = first_value(column, ch_type.nullable)
+ write_col = column
+ encoding = ctx.encoding or ch_type.encoding
+ if not isinstance(first, str) and ch_type.write_format(ctx) != 'string':
+ to_json = any_to_json
+ write_col = [to_json(v) for v in column]
+ encoding = None
+ handle_error(data_conv.write_str_col(write_col, ch_type.nullable, encoding, dest), ctx)
+
+
+def write_str_values(ch_type:ClickHouseType, column: Sequence, dest: bytearray, ctx: InsertContext):
+ encoding = ctx.encoding or ch_type.encoding
+ col = [''] * len(column)
+ for ix, v in enumerate(column):
+ if v is None:
+ col[ix] = 'NULL'
+ else:
+ col[ix] = str(v)
+ handle_error(data_conv.write_str_col(col, False, encoding, dest), ctx)
+
+
+class JSON(ClickHouseType):
+ _slots = 'typed_paths', 'typed_types'
+ python_type = dict
+ valid_formats = 'string', 'native'
+ _data_size = json_sample_size
+ write_column_data = write_json
+ shared_data_type: ClickHouseType
+ max_dynamic_paths = 0
+ max_dynamic_types = 0
+ typed_paths = []
+ typed_types = []
+ skips = []
+
+ def __init__(self, type_def:TypeDef):
+ super().__init__(type_def)
+ typed_paths = []
+ typed_types = []
+ skips = []
+ parts = []
+ for key, value in zip(type_def.keys, type_def.values):
+ if key == 'max_dynamic_paths':
+ try:
+ self.max_dynamic_paths = int(value)
+ parts.append(f'{key} = {value}')
+ continue
+ except ValueError:
+ pass
+ if key == 'max_dynamic_types':
+ try:
+ self.max_dynamic_types = int(value)
+ parts.append(f'{key} = {value}')
+ continue
+ except ValueError:
+ pass
+ if key == 'SKIP':
+ if value.startswith('REGEXP'):
+ value = 'REGEXP ' + value[6:]
+ else:
+ if not value.startswith("`"):
+ value = f'`{value}`'
+ skips.append(value)
+ else:
+ key = unescape_identifier(key)
+ typed_paths.append(key)
+ typed_types.append(get_from_name(value))
+ key = f'`{key}`'
+ parts.append(f'{key} {value}')
+ if typed_paths:
+ self.typed_paths = typed_paths
+ self.typed_types = typed_types
+ if skips:
+ self.skips = skips
+ if parts:
+ self._name_suffix = f'({", ".join(parts)})'
+
+ @property
+ def insert_name(self):
+ return 'String'
+
+ # pylint: disable=too-many-locals
+ def read_column(self, source: ByteSource, num_rows: int, ctx: QueryContext):
+ if source.read_uint64() != 0: # object serialization version, currently only 0 is recognized
+ raise DataError(f'unrecognized object serialization version, column `{ctx.column_name}`')
+ source.read_leb128() # the max number of dynamic paths. Used to preallocate storage in ClickHouse; we ignore it
+ dynamic_path_cnt = source.read_leb128()
+ dynamic_paths = [source.read_leb128_str() for _ in range(dynamic_path_cnt)]
+ for typed in self.typed_types:
+ typed.read_column_prefix(source, ctx)
+ dynamic_variants = [read_dynamic_prefix(source) for _ in range(dynamic_path_cnt)]
+ # C++ prefix read ends here
+
+ typed_columns = [ch_type.read_column_data(source, num_rows, ctx) for ch_type in self.typed_types]
+ dynamic_columns = [read_variant_column(dynamic_variants[ix], source, num_rows, ctx) for ix in range(dynamic_path_cnt)]
+ SHARED_DATA_TYPE.read_column_data(source, num_rows, ctx)
+ col = []
+ for row_num in range(num_rows):
+ top = {}
+ for ix, field in enumerate(self.typed_paths):
+ value = typed_columns[ix][row_num]
+ item = top
+ chain = field.split('.')
+ for key in chain[:-1]:
+ child = item.get(key)
+ if child is None:
+ child = {}
+ item[key] = child
+ item = child
+ item[chain[-1]] = value
+ for ix, field in enumerate(dynamic_paths):
+ value = dynamic_columns[ix][row_num]
+ if value is None:
+ continue
+ item = top
+ chain = field.split('.')
+ for key in chain[:-1]:
+ child = item.get(key)
+ if child is None:
+ child = {}
+ item[key] = child
+ item = child
+ item[chain[-1]] = value
+ col.append(top)
+ if self.read_format(ctx) == 'string':
+ return [any_to_json(v) for v in col]
+ return col
+
+
+# Note that this type is deprecated and should not be used, it included for temporary backward compatibility only
+class Object(ClickHouseType):
+ python_type = dict
+ # Native is a Python type (primitive, dict, array), string is an actual JSON string
+ valid_formats = 'string', 'native'
+ _data_size = json_sample_size
+ write_column_data = write_json
+
+ def __init__(self, type_def):
+ data_type = type_def.values[0].lower().replace(' ', '')
+ if data_type not in ("'json'", "nullable('json')"):
+ raise NotImplementedError('Only json or Nullable(json) Object type is currently supported')
+ super().__init__(type_def)
+ self._name_suffix = type_def.arg_str
+
+ def write_column_prefix(self, dest: bytearray):
+ dest.append(0x01)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py
index 14b7bc3b9a..f44686367d 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py
@@ -3,7 +3,7 @@ from ipaddress import IPv4Address, IPv6Address
from typing import Union, MutableSequence, Sequence
from clickhouse_connect.datatypes.base import ClickHouseType
-from clickhouse_connect.driver.common import write_array, int_size
+from clickhouse_connect.driver.common import write_array, int_size, first_value
from clickhouse_connect.driver.insert import InsertContext
from clickhouse_connect.driver.query import QueryContext
from clickhouse_connect.driver.types import ByteSource
@@ -29,7 +29,7 @@ class IPv4(ClickHouseType):
return data_conv.read_ipv4_col(source, num_rows)
def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext):
- first = self._first_value(column)
+ first = first_value(column, self.nullable)
if isinstance(first, str):
fixed = 24, 16, 8, 0
# pylint: disable=consider-using-generator
@@ -39,7 +39,7 @@ class IPv4(ClickHouseType):
column = [x._ip if x else 0 for x in column]
else:
column = [x._ip for x in column]
- write_array(self._array_type, column, dest)
+ write_array(self._array_type, column, dest, ctx.column_name)
def _active_null(self, ctx: QueryContext):
fmt = self.read_format(ctx)
@@ -103,7 +103,7 @@ class IPv6(ClickHouseType):
def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext):
v = V6_NULL
- first = self._first_value(column)
+ first = first_value(column, self.nullable)
v4mask = IPV4_V6_MASK
af6 = socket.AF_INET6
tov6 = socket.inet_pton
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py
index 2cd9e5f105..0ff62b1dc2 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py
@@ -4,7 +4,7 @@ from typing import Union, Type, Sequence, MutableSequence
from math import nan
from clickhouse_connect.datatypes.base import TypeDef, ArrayType, ClickHouseType
-from clickhouse_connect.driver.common import array_type, write_array, decimal_size, decimal_prec
+from clickhouse_connect.driver.common import array_type, write_array, decimal_size, decimal_prec, first_value
from clickhouse_connect.driver.ctypes import numpy_conv, data_conv
from clickhouse_connect.driver.insert import InsertContext
from clickhouse_connect.driver.options import pd, np
@@ -98,7 +98,7 @@ class BigInt(ClickHouseType, registered=False):
def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext):
if len(column) == 0:
return
- first = self._first_value(column)
+ first = first_value(column, self.nullable)
sz = self.byte_size
signed = self._signed
empty = bytes(b'\x00' * sz)
@@ -189,8 +189,8 @@ class Bool(ClickHouseType):
return np.array(column)
return column
- def _write_column_binary(self, column, dest, _ctx):
- write_array('B', [1 if x else 0 for x in column], dest)
+ def _write_column_binary(self, column, dest, ctx):
+ write_array('B', [1 if x else 0 for x in column], dest, ctx.column_name)
class Boolean(Bool):
@@ -218,15 +218,15 @@ class Enum(ClickHouseType):
lookup = self._int_map.get
return [lookup(x, None) for x in column]
- def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, _ctx):
- first = self._first_value(column)
+ def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx:InsertContext):
+ first = first_value(column, self.nullable)
if first is None or not isinstance(first, str):
if self.nullable:
column = [0 if not x else x for x in column]
- write_array(self._array_type, column, dest)
+ write_array(self._array_type, column, dest, ctx.column_name)
else:
lookup = self._name_map.get
- write_array(self._array_type, [lookup(x, 0) for x in column], dest)
+ write_array(self._array_type, [lookup(x, 0) for x in column], dest, ctx.column_name)
class Enum8(Enum):
@@ -285,15 +285,15 @@ class Decimal(ClickHouseType):
app(dec(f'-{digits[:-scale]}.{digits[-scale:]}'))
return new_col
- def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, _ctx):
- with decimal.localcontext() as ctx:
- ctx.prec = self.prec
+ def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx:InsertContext):
+ with decimal.localcontext() as dec_ctx:
+ dec_ctx.prec = self.prec
dec = decimal.Decimal
mult = self._mult
if self.nullable:
- write_array(self._array_type, [int(dec(str(x)) * mult) if x else 0 for x in column], dest)
+ write_array(self._array_type, [int(dec(str(x)) * mult) if x else 0 for x in column], dest, ctx.column_name)
else:
- write_array(self._array_type, [int(dec(str(x)) * mult) for x in column], dest)
+ write_array(self._array_type, [int(dec(str(x)) * mult) for x in column], dest, ctx.column_name)
def _active_null(self, ctx: QueryContext):
if ctx.use_none:
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/postinit.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/postinit.py
new file mode 100644
index 0000000000..3536c65195
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/postinit.py
@@ -0,0 +1,4 @@
+from clickhouse_connect.datatypes import registry, dynamic
+
+dynamic.SHARED_DATA_TYPE = registry.get_from_name('Array(String, String)')
+dynamic.STRING_DATA_TYPE = registry.get_from_name('String')
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/registry.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/registry.py
index 52d1036787..6fb20d8af6 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/registry.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/registry.py
@@ -35,6 +35,12 @@ def parse_name(name: str) -> Tuple[str, str, TypeDef]:
elif base.startswith('Tuple'):
keys, values = parse_columns(base[5:])
base = 'Tuple'
+ elif base.startswith('Variant'):
+ keys, values = parse_columns(base[7:])
+ base = 'Variant'
+ elif base.startswith('JSON') and len(base) > 4 and base[4] == '(':
+ keys, values = parse_columns(base[4:])
+ base = 'JSON'
elif base == 'Point':
values = ('Float64', 'Float64')
else:
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py
index 29d28e3378..9d296d00fc 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py
@@ -3,6 +3,7 @@ from uuid import UUID as PYUUID
from clickhouse_connect.datatypes.base import TypeDef, ClickHouseType, ArrayType, UnsupportedType
from clickhouse_connect.datatypes.registry import get_from_name
+from clickhouse_connect.driver.common import first_value
from clickhouse_connect.driver.ctypes import data_conv
from clickhouse_connect.driver.insert import InsertContext
from clickhouse_connect.driver.query import QueryContext
@@ -37,7 +38,7 @@ class UUID(ClickHouseType):
# pylint: disable=too-many-branches
def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext):
- first = self._first_value(column)
+ first = first_value(column, self.nullable)
empty = empty_uuid_b
if isinstance(first, str) or self.write_format(ctx) == 'string':
for v in column:
@@ -92,8 +93,8 @@ class SimpleAggregateFunction(ClickHouseType):
def _data_size(self, sample: Sequence) -> int:
return self.element_type.data_size(sample)
- def read_column_prefix(self, source: ByteSource):
- return self.element_type.read_column_prefix(source)
+ def read_column_prefix(self, source: ByteSource, ctx: QueryContext):
+ return self.element_type.read_column_prefix(source, ctx)
def write_column_prefix(self, dest: bytearray):
self.element_type.write_column_prefix(dest)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py
index 3cfcd38630..4b7886510d 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py
@@ -1,10 +1,10 @@
from typing import Sequence, MutableSequence, Union, Collection
+from clickhouse_connect.driver.common import first_value
from clickhouse_connect.driver.ctypes import data_conv
from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef
from clickhouse_connect.driver.errors import handle_error
-from clickhouse_connect.driver.exceptions import DataError
from clickhouse_connect.driver.insert import InsertContext
from clickhouse_connect.driver.query import QueryContext
from clickhouse_connect.driver.types import ByteSource
@@ -45,9 +45,9 @@ class String(ClickHouseType):
def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext):
encoding = None
- if not isinstance(self._first_value(column), bytes):
+ if not isinstance(first_value(column, self.nullable), bytes):
encoding = ctx.encoding or self.encoding
- handle_error(data_conv.write_str_col(column, self.nullable, encoding, dest))
+ handle_error(data_conv.write_str_col(column, self.nullable, encoding, dest), ctx)
def _active_null(self, ctx):
if ctx.use_none:
@@ -92,7 +92,7 @@ class FixedString(ClickHouseType):
empty = bytes((0,) * sz)
str_enc = str.encode
enc = ctx.encoding or self.encoding
- first = self._first_value(column)
+ first = first_value(column, self.nullable)
if isinstance(first, str) or self.write_format(ctx) == 'string':
if self.nullable:
for x in column:
@@ -104,7 +104,7 @@ class FixedString(ClickHouseType):
except UnicodeEncodeError:
b = empty
if len(b) > sz:
- raise DataError(f'UTF-8 encoded FixedString value {b.hex(" ")} exceeds column size {sz}')
+ raise ctx.data_error(f'UTF-8 encoded FixedString value {b.hex(" ")} exceeds column size {sz}')
ext(b)
ext(empty[:sz - len(b)])
else:
@@ -114,7 +114,7 @@ class FixedString(ClickHouseType):
except UnicodeEncodeError:
b = empty
if len(b) > sz:
- raise DataError(f'UTF-8 encoded FixedString value {b.hex(" ")} exceeds column size {sz}')
+ raise ctx.data_error(f'UTF-8 encoded FixedString value {b.hex(" ")} exceeds column size {sz}')
ext(b)
ext(empty[:sz - len(b)])
elif self.nullable:
@@ -122,11 +122,11 @@ class FixedString(ClickHouseType):
if not b:
ext(empty)
elif len(b) != sz:
- raise DataError(f'Fixed String binary value {b.hex(" ")} does not match column size {sz}')
+ raise ctx.data_error(f'Fixed String binary value {b.hex(" ")} does not match column size {sz}')
else:
ext(b)
else:
for b in column:
if len(b) != sz:
- raise DataError(f'Fixed String binary value {b.hex(" ")} does not match column size {sz}')
+ raise ctx.data_error(f'Fixed String binary value {b.hex(" ")} does not match column size {sz}')
ext(b)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py
index da672823b6..6359d5ba55 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py
@@ -4,7 +4,7 @@ from datetime import date, datetime, tzinfo
from typing import Union, Sequence, MutableSequence
from clickhouse_connect.datatypes.base import TypeDef, ClickHouseType
-from clickhouse_connect.driver.common import write_array, np_date_types, int_size
+from clickhouse_connect.driver.common import write_array, np_date_types, int_size, first_value
from clickhouse_connect.driver.exceptions import ProgrammingError
from clickhouse_connect.driver.ctypes import data_conv, numpy_conv
from clickhouse_connect.driver.insert import InsertContext
@@ -32,7 +32,7 @@ class Date(ClickHouseType):
return data_conv.read_date_col(source, num_rows)
def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext):
- first = self._first_value(column)
+ first = first_value(column, self.nullable)
if isinstance(first, int) or self.write_format(ctx) == 'int':
if self.nullable:
column = [x if x else 0 for x in column]
@@ -45,7 +45,7 @@ class Date(ClickHouseType):
column = [0 if x is None else (x - esd).days for x in column]
else:
column = [(x - esd).days for x in column]
- write_array(self._array_type, column, dest)
+ write_array(self._array_type, column, dest, ctx.column_name)
def _active_null(self, ctx: QueryContext):
fmt = self.read_format(ctx)
@@ -127,7 +127,7 @@ class DateTime(DateTimeBase):
return data_conv.read_datetime_col(source, num_rows, active_tz)
def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext):
- first = self._first_value(column)
+ first = first_value(column, self.nullable)
if isinstance(first, int) or self.write_format(ctx) == 'int':
if self.nullable:
column = [x if x else 0 for x in column]
@@ -136,7 +136,7 @@ class DateTime(DateTimeBase):
column = [int(x.timestamp()) if x else 0 for x in column]
else:
column = [int(x.timestamp()) for x in column]
- write_array(self._array_type, column, dest)
+ write_array(self._array_type, column, dest, ctx.column_name)
class DateTime64(DateTimeBase):
@@ -202,7 +202,7 @@ class DateTime64(DateTimeBase):
return new_col
def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext):
- first = self._first_value(column)
+ first = first_value(column, self.nullable)
if isinstance(first, int) or self.write_format(ctx) == 'int':
if self.nullable:
column = [x if x else 0 for x in column]
@@ -213,4 +213,4 @@ class DateTime64(DateTimeBase):
for x in column]
else:
column = [((int(x.timestamp()) * 1000000 + x.microsecond) * prec) // 1000000 for x in column]
- write_array('q', column, dest)
+ write_array('q', column, dest, ctx.column_name)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py
index 27e16733d8..fe3e4b3fbc 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py
@@ -84,7 +84,7 @@ def create_client(*,
database = database or parsed.path
for k, v in parse_qs(parsed.query).items():
kwargs[k] = v[0]
- use_tls = str(secure).lower() == 'true' or interface == 'https' or (not interface and port in (443, 8443))
+ use_tls = str(secure).lower() == 'true' or interface == 'https' or (not interface and str(port) in ('443', '8443'))
if not host:
host = 'localhost'
if not interface:
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
index e2dc5b0118..bf56c9783f 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
@@ -13,7 +13,7 @@ from clickhouse_connect.datatypes.base import ClickHouseType
from clickhouse_connect.driver.insert import InsertContext
-# pylint: disable=too-many-public-methods, too-many-instance-attributes, too-many-arguments, too-many-locals
+# pylint: disable=too-many-public-methods,too-many-instance-attributes,too-many-arguments,too-many-positional-arguments,too-many-locals
class AsyncClient:
"""
AsyncClient is a wrapper around the ClickHouse Client object that allows for async calls to the ClickHouse server.
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/binding.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/binding.py
new file mode 100644
index 0000000000..24522b0880
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/binding.py
@@ -0,0 +1,199 @@
+import ipaddress
+import re
+import uuid
+from datetime import tzinfo, datetime, date
+from enum import Enum
+from typing import Optional, Union, Sequence, Dict, Any, Tuple
+
+import pytz
+
+from clickhouse_connect import common
+from clickhouse_connect.driver.common import dict_copy
+from clickhouse_connect.json_impl import any_to_json
+
+BS = '\\'
+must_escape = (BS, '\'', '`', '\t', '\n')
+external_bind_re = re.compile(r'{.+:.+}')
+
+
+class DT64Param:
+ def __init__(self, value: datetime):
+ self.value = value
+
+ def format(self, tz: tzinfo, top_level:bool) -> str:
+ value = self.value
+ if tz:
+ value = value.astimezone(tz)
+ s = value.strftime('%Y-%m-%d %H:%M:%S.%f')
+ if top_level:
+ return s
+ return f"'{s}'"
+
+
+def quote_identifier(identifier: str):
+ first_char = identifier[0]
+ if first_char in ('`', '"') and identifier[-1] == first_char:
+ # Identifier is already quoted, assume that it's valid
+ return identifier
+ return f'`{escape_str(identifier)}`'
+
+
+def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]],
+ server_tz: Optional[tzinfo] = None) -> str:
+ while query.endswith(';'):
+ query = query[:-1]
+ if not parameters:
+ return query
+ if hasattr(parameters, 'items'):
+ return query % {k: format_query_value(v, server_tz) for k, v in parameters.items()}
+ return query % tuple(format_query_value(v, server_tz) for v in parameters)
+
+
+# pylint: disable=too-many-locals,too-many-branches
+def bind_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]],
+ server_tz: Optional[tzinfo] = None) -> Tuple[str, Dict[str, str]]:
+ while query.endswith(';'):
+ query = query[:-1]
+ if not parameters:
+ return query, {}
+
+ binary_binds = None
+
+ if isinstance(parameters, dict):
+ params_copy = dict_copy(parameters)
+ binary_binds = {k: v for k, v in params_copy.items() if k.startswith('$') and k.endswith('$') and len(k) > 1}
+ for key in binary_binds.keys():
+ del params_copy[key]
+
+ final_params = {}
+ for k, v in params_copy.items():
+ if k.endswith('_64'):
+ if isinstance(v, datetime):
+ k = k[:-3]
+ v = DT64Param(v)
+ elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], datetime):
+ k = k[:-3]
+ v = [DT64Param(x) for x in v]
+ final_params[k] = v
+ if external_bind_re.search(query) is None:
+ query, bound_params = finalize_query(query, final_params, server_tz), {}
+ else:
+ bound_params = {f'param_{k}': format_bind_value(v, server_tz) for k, v in final_params.items()}
+ else:
+ query, bound_params = finalize_query(query, parameters, server_tz), {}
+ if binary_binds:
+ binary_query = query.encode()
+ binary_indexes = {}
+ for k, v in binary_binds.items():
+ key = k.encode()
+ item_index = 0
+ while True:
+ item_index = binary_query.find(key, item_index)
+ if item_index == -1:
+ break
+ binary_indexes[item_index + len(key)] = key, v
+ item_index += len(key)
+ query = b''
+ start = 0
+ for loc in sorted(binary_indexes.keys()):
+ key, value = binary_indexes[loc]
+ query += binary_query[start:loc] + value + key
+ start = loc
+ query += binary_query[start:]
+ return query, bound_params
+
+
+def format_str(value: str):
+ return f"'{escape_str(value)}'"
+
+
+def escape_str(value: str):
+ return ''.join(f'{BS}{c}' if c in must_escape else c for c in value)
+
+
+# pylint: disable=too-many-return-statements
+def format_query_value(value: Any, server_tz: tzinfo = pytz.UTC):
+ """
+ Format Python values in a ClickHouse query
+ :param value: Python object
+ :param server_tz: Server timezone for adjusting datetime values
+ :return: Literal string for python value
+ """
+ if value is None:
+ return 'NULL'
+ if isinstance(value, str):
+ return format_str(value)
+ if isinstance(value, DT64Param):
+ return value.format(server_tz, False)
+ if isinstance(value, datetime):
+ if value.tzinfo is not None or server_tz != pytz.UTC:
+ value = value.astimezone(server_tz)
+ return f"'{value.strftime('%Y-%m-%d %H:%M:%S')}'"
+ if isinstance(value, date):
+ return f"'{value.isoformat()}'"
+ if isinstance(value, list):
+ return f"[{', '.join(str_query_value(x, server_tz) for x in value)}]"
+ if isinstance(value, tuple):
+ return f"({', '.join(str_query_value(x, server_tz) for x in value)})"
+ if isinstance(value, dict):
+ if common.get_setting('dict_parameter_format') == 'json':
+ return format_str(any_to_json(value).decode())
+ pairs = [str_query_value(k, server_tz) + ':' + str_query_value(v, server_tz)
+ for k, v in value.items()]
+ return f"{{{', '.join(pairs)}}}"
+ if isinstance(value, Enum):
+ return format_query_value(value.value, server_tz)
+ if isinstance(value, (uuid.UUID, ipaddress.IPv4Address, ipaddress.IPv6Address)):
+ return f"'{value}'"
+ return value
+
+
+def str_query_value(value: Any, server_tz: tzinfo = pytz.UTC):
+ return str(format_query_value(value, server_tz))
+
+
+# pylint: disable=too-many-branches
+def format_bind_value(value: Any, server_tz: tzinfo = pytz.UTC, top_level: bool = True):
+ """
+ Format Python values in a ClickHouse query
+ :param value: Python object
+ :param server_tz: Server timezone for adjusting datetime values
+ :param top_level: Flag for top level for nested structures
+ :return: Literal string for python value
+ """
+
+ def recurse(x):
+ return format_bind_value(x, server_tz, False)
+
+ if value is None:
+ return '\\N'
+ if isinstance(value, str):
+ if top_level:
+ # At the top levels, strings must not be surrounded by quotes
+ return escape_str(value)
+ return format_str(value)
+ if isinstance(value, DT64Param):
+ return value.format(server_tz, top_level)
+ if isinstance(value, datetime):
+ value = value.astimezone(server_tz)
+ val = value.strftime('%Y-%m-%d %H:%M:%S')
+ if top_level:
+ return val
+ return f"'{val}'"
+ if isinstance(value, date):
+ if top_level:
+ return value.isoformat()
+ return f"'{value.isoformat()}'"
+ if isinstance(value, list):
+ return f"[{', '.join(recurse(x) for x in value)}]"
+ if isinstance(value, tuple):
+ return f"({', '.join(recurse(x) for x in value)})"
+ if isinstance(value, dict):
+ if common.get_setting('dict_parameter_format') == 'json':
+ return any_to_json(value).decode()
+ pairs = [recurse(k) + ':' + recurse(v)
+ for k, v in value.items()]
+ return f"{{{', '.join(pairs)}}}"
+ if isinstance(value, Enum):
+ return recurse(value.value)
+ return str(value)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py
index ba3873cbd8..fe11c27883 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py
@@ -20,15 +20,15 @@ from clickhouse_connect.driver.external import ExternalData
from clickhouse_connect.driver.insert import InsertContext
from clickhouse_connect.driver.summary import QuerySummary
from clickhouse_connect.driver.models import ColumnDef, SettingDef, SettingStatus
-from clickhouse_connect.driver.query import QueryResult, to_arrow, to_arrow_batches, QueryContext, arrow_buffer, \
- quote_identifier
+from clickhouse_connect.driver.query import QueryResult, to_arrow, to_arrow_batches, QueryContext, arrow_buffer
+from clickhouse_connect.driver.binding import quote_identifier
io.DEFAULT_BUFFER_SIZE = 1024 * 256
logger = logging.getLogger(__name__)
arrow_str_setting = 'output_format_arrow_string_as_string'
-# pylint: disable=too-many-public-methods, too-many-instance-attributes
+# pylint: disable=too-many-public-methods,too-many-arguments,too-many-positional-arguments,too-many-instance-attributes
class Client(ABC):
"""
Base ClickHouse Connect client
@@ -93,6 +93,8 @@ class Client(ABC):
})
if test_data[8:16] == b'\x01\x01\x05check':
self.protocol_version = PROTOCOL_VERSION_WITH_LOW_CARD
+ if self._setting_status('date_time_input_format').is_writable:
+ self.set_client_setting('date_time_input_format', 'best_effort')
self.uri = uri
def _validate_settings(self, settings: Optional[Dict[str, Any]]) -> Dict[str, str]:
@@ -171,7 +173,7 @@ class Client(ABC):
:return: The string value of the setting, if it exists, or None
"""
- # pylint: disable=too-many-arguments,unused-argument,too-many-locals
+ # pylint: disable=unused-argument,too-many-locals
def query(self,
query: Optional[str] = None,
parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
@@ -303,7 +305,7 @@ class Client(ABC):
:return: io.IOBase stream/iterator for the result
"""
- # pylint: disable=duplicate-code,too-many-arguments,unused-argument
+ # pylint: disable=duplicate-code,unused-argument
def query_np(self,
query: Optional[str] = None,
parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
@@ -341,7 +343,7 @@ class Client(ABC):
"""
return self._context_query(locals(), use_numpy=True, streaming=True).np_stream
- # pylint: disable=duplicate-code,too-many-arguments,unused-argument
+ # pylint: disable=duplicate-code,unused-argument
def query_df(self,
query: Optional[str] = None,
parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
@@ -364,7 +366,7 @@ class Client(ABC):
"""
return self._context_query(locals(), use_numpy=True, as_pandas=True).df_result
- # pylint: disable=duplicate-code,too-many-arguments,unused-argument
+ # pylint: disable=duplicate-code,unused-argument
def query_df_stream(self,
query: Optional[str] = None,
parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
@@ -571,7 +573,6 @@ class Client(ABC):
:return: ClickHouse server is up and reachable
"""
- # pylint: disable=too-many-arguments
def insert(self,
table: Optional[str] = None,
data: Sequence[Sequence[Any]] = None,
@@ -777,11 +778,18 @@ class Client(ABC):
:param fmt: Valid clickhouse format
"""
+ @abstractmethod
def close(self):
"""
Subclass implementation to close the connection to the server/deallocate the client
"""
+ @abstractmethod
+ def close_connections(self):
+ """
+ Subclass implementation to disconnect all "re-used" client connections
+ """
+
def _context_query(self, lcls: dict, **overrides):
kwargs = lcls.copy()
kwargs.pop('self')
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py
index dca0dc9317..404a09e867 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py
@@ -7,6 +7,7 @@ from typing import Sequence, MutableSequence, Dict, Optional, Union, Generator
from clickhouse_connect.driver.exceptions import ProgrammingError, StreamClosedError, DataError
from clickhouse_connect.driver.types import Closable
+
# pylint: disable=invalid-name
must_swap = sys.byteorder == 'big'
int_size = array.array('i').itemsize
@@ -38,12 +39,13 @@ def array_type(size: int, signed: bool):
return code if signed else code.upper()
-def write_array(code: str, column: Sequence, dest: MutableSequence):
+def write_array(code: str, column: Sequence, dest: MutableSequence, col_name: Optional[str]=None):
"""
Write a column of native Python data matching the array.array code
:param code: Python array.array code matching the column data type
:param column: Column of native Python values
:param dest: Destination byte buffer
+ :param col_name: Optional column name for error tracking
"""
if len(column) and not isinstance(column[0], (int, float)):
if code in ('f', 'F', 'd', 'D'):
@@ -54,8 +56,11 @@ def write_array(code: str, column: Sequence, dest: MutableSequence):
buff = struct.Struct(f'<{len(column)}{code}')
dest += buff.pack(*column)
except (TypeError, OverflowError, struct.error) as ex:
- raise DataError('Unable to create Python array. This is usually caused by trying to insert None ' +
- 'values into a ClickHouse column that is not Nullable') from ex
+ col_msg = ''
+ if col_name:
+ col_msg = f' for source column `{col_name}`'
+ raise DataError(f'Unable to create Python array{col_msg}. This is usually caused by trying to insert None ' +
+ 'values into a ClickHouse column that is not Nullable') from ex
def write_uint64(value: int, dest: MutableSequence):
@@ -134,6 +139,14 @@ def coerce_bool(val: Optional[Union[str, bool]]):
return val is True or (isinstance(val, str) and val.lower() in ('true', '1', 'y', 'yes'))
+def first_value(column: Sequence, nullable:bool = True):
+ if nullable:
+ return next((x for x in column if x is not None), None)
+ if len(column):
+ return column[0]
+ return None
+
+
class SliceView(Sequence):
"""
Provides a view into a sequence rather than copying. Borrows liberally from
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py
index ad2b0d38d1..00125a0bdd 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py
@@ -36,6 +36,7 @@ class BaseQueryContext:
for type_name, fmt in fmt.items()}
self.query_formats = query_formats or {}
self.column_formats = column_formats or {}
+ self.column_name = None
self.encoding = encoding
self.use_numpy = use_numpy
self.use_extended_dtypes = use_extended_dtypes
@@ -43,6 +44,7 @@ class BaseQueryContext:
self._active_col_type_fmts = _empty_map
def start_column(self, name: str):
+ self.column_name = name
self._active_col_fmt = self.col_simple_formats.get(name)
self._active_col_type_fmts = self.col_type_formats.get(name, _empty_map)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py
index e7bb607e68..39e5954c1f 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py
@@ -46,4 +46,4 @@ def connect_numpy():
str(ex))
-connect_c_modules()
+# connect_c_modules()
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/errors.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/errors.py
index 0d37890b7e..e5b3b6466e 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/errors.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/errors.py
@@ -1,3 +1,4 @@
+from clickhouse_connect.driver.context import BaseQueryContext
from clickhouse_connect.driver.exceptions import DataError
@@ -8,6 +9,9 @@ NONE_IN_NULLABLE_COLUMN = 1
error_messages = {NONE_IN_NULLABLE_COLUMN: 'Invalid None value in non-Nullable column'}
-def handle_error(error_num: int):
+def handle_error(error_num: int, ctx: BaseQueryContext):
if error_num > 0:
- raise DataError(error_messages[error_num])
+ msg = error_messages[error_num]
+ if ctx.column_name:
+ msg = f'{msg}, column name: `{ctx.column_name}`'
+ raise DataError(msg)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py
index bf3c0d863f..001c578b26 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py
@@ -24,7 +24,8 @@ from clickhouse_connect.driver.external import ExternalData
from clickhouse_connect.driver.httputil import ResponseSource, get_pool_manager, get_response_data, \
default_pool_manager, get_proxy_manager, all_managers, check_env_proxy, check_conn_expiration
from clickhouse_connect.driver.insert import InsertContext
-from clickhouse_connect.driver.query import QueryResult, QueryContext, quote_identifier, bind_query
+from clickhouse_connect.driver.query import QueryResult, QueryContext
+from clickhouse_connect.driver.binding import quote_identifier, bind_query
from clickhouse_connect.driver.summary import QuerySummary
from clickhouse_connect.driver.transform import NativeTransform
@@ -44,7 +45,7 @@ class HttpClient(Client):
'enable_http_compression'}
_owns_pool_manager = False
- # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements,unused-argument
+ # pylint: disable=too-many-positional-arguments,too-many-arguments,too-many-locals,too-many-branches,too-many-statements,unused-argument
def __init__(self,
interface: str,
host: str,
@@ -70,7 +71,8 @@ class HttpClient(Client):
server_host_name: Optional[str] = None,
apply_server_timezone: Optional[Union[str, bool]] = None,
show_clickhouse_errors: Optional[bool] = None,
- autogenerate_session_id: Optional[bool] = None):
+ autogenerate_session_id: Optional[bool] = None,
+ tls_mode: Optional[str] = None):
"""
Create an HTTP ClickHouse Connect client
See clickhouse_connect.get_client for parameters
@@ -80,20 +82,20 @@ class HttpClient(Client):
ch_settings = dict_copy(settings, self.params)
self.http = pool_mgr
if interface == 'https':
+ if isinstance(verify, str) and verify.lower() == 'proxy':
+ verify = True
+ tls_mode = tls_mode or 'proxy'
if not https_proxy:
https_proxy = check_env_proxy('https', host, port)
- if https_proxy and isinstance(verify, str) and verify.lower() == 'proxy':
- verify = 'proxy'
- else:
- verify = coerce_bool(verify)
- if client_cert and verify != 'proxy':
+ verify = coerce_bool(verify)
+ if client_cert and (tls_mode is None or tls_mode == 'mutual'):
if not username:
raise ProgrammingError('username parameter is required for Mutual TLS authentication')
self.headers['X-ClickHouse-User'] = username
self.headers['X-ClickHouse-SSL-Certificate-Auth'] = 'on'
# pylint: disable=too-many-boolean-expressions
if not self.http and (server_host_name or ca_cert or client_cert or not verify or https_proxy):
- options = {'verify': verify is not False}
+ options = {'verify': verify}
dict_add(options, 'ca_cert', ca_cert)
dict_add(options, 'client_cert', client_cert)
dict_add(options, 'client_cert_key', client_cert_key)
@@ -111,7 +113,7 @@ class HttpClient(Client):
else:
self.http = default_pool_manager()
- if (not client_cert or verify == 'proxy') and username:
+ if (not client_cert or tls_mode in ('strict', 'proxy')) and username:
self.headers['Authorization'] = 'Basic ' + b64encode(f'{username}:{password}'.encode()).decode()
self.headers['User-Agent'] = common.build_client_name(client_name)
self._read_format = self._write_format = 'Native'
@@ -157,7 +159,7 @@ class HttpClient(Client):
server_host_name=server_host_name,
apply_server_timezone=apply_server_timezone,
show_clickhouse_errors=show_clickhouse_errors)
- self.params = self._validate_settings(ch_settings)
+ self.params = dict_copy(self.params, self._validate_settings(ch_settings))
comp_setting = self._setting_status('enable_http_compression')
self._send_comp_setting = not comp_setting.is_set and comp_setting.is_writable
if comp_setting.is_set or comp_setting.is_writable:
@@ -195,7 +197,7 @@ class HttpClient(Client):
context.block_info = True
params.update(context.bind_params)
params.update(self._validate_settings(context.settings))
- if columns_only_re.search(context.uncommented_query):
+ if not context.is_insert and columns_only_re.search(context.uncommented_query):
response = self._raw_request(f'{context.final_query}\n FORMAT JSON',
params, headers, retries=self.query_retries)
json_result = json.loads(response.data)
@@ -513,6 +515,9 @@ class HttpClient(Client):
logger.debug('ping failed', exc_info=True)
return False
+ def close_connections(self):
+ self.http.clear()
+
def close(self):
if self._owns_pool_manager:
self.http.clear()
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py
index 9a2b835c65..7dd73114e9 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py
@@ -6,7 +6,8 @@ import os
import sys
import socket
import time
-from typing import Dict, Any, Optional
+from collections import deque
+from typing import Dict, Any, Optional, Tuple, Callable
import certifi
import lz4.frame
@@ -192,34 +193,56 @@ class ResponseSource:
def __init__(self, response: HTTPResponse, chunk_size: int = 1024 * 1024):
self.response = response
compression = response.headers.get('content-encoding')
+ decompress:Optional[Callable] = None
if compression == 'zstd':
zstd_decom = zstandard.ZstdDecompressor().decompressobj()
- def decompress():
- while True:
- chunk = response.read(chunk_size, decode_content=False)
- if not chunk:
- break
- yield zstd_decom.decompress(chunk)
+ def zstd_decompress(c: deque) -> Tuple[bytes, int]:
+ chunk = c.popleft()
+ return zstd_decom.decompress(chunk), len(chunk)
- self.gen = decompress()
+ decompress = zstd_decompress
elif compression == 'lz4':
lz4_decom = lz4.frame.LZ4FrameDecompressor()
- def decompress():
+ def lz_decompress(c: deque) -> Tuple[Optional[bytes], int]:
+ read_amt = 0
while lz4_decom.needs_input:
- data = self.response.read(chunk_size, decode_content=False)
+ data = c.popleft()
+ read_amt += len(data)
if lz4_decom.unused_data:
data = lz4_decom.unused_data + data
- if not data:
- return
- chunk = lz4_decom.decompress(data)
- if chunk:
- yield chunk
-
- self.gen = decompress()
- else:
- self.gen = response.stream(amt=chunk_size, decode_content=True)
+ return lz4_decom.decompress(data), read_amt
+ return None, 0
+
+ decompress = lz_decompress
+
+ buffer_size = common.get_setting('http_buffer_size')
+
+ def buffered():
+ chunks = deque()
+ done = False
+ current_size = 0
+ read_gen = response.read_chunked(chunk_size, decompress is None)
+ while True:
+ while not done and current_size < buffer_size:
+ chunk = next(read_gen, None)
+ if not chunk:
+ done = True
+ break
+ chunks.append(chunk)
+ current_size += len(chunk)
+ if len(chunks) == 0:
+ return
+ if decompress:
+ chunk, used = decompress(chunks)
+ current_size -= used
+ else:
+ chunk = chunks.popleft()
+ current_size -= len(chunk)
+ yield chunk
+
+ self.gen = buffered()
def close(self):
self.response.drain_conn()
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py
index 39a306cdc4..8ca1ef9f22 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py
@@ -2,12 +2,12 @@ import logging
from math import log
from typing import Iterable, Sequence, Optional, Any, Dict, NamedTuple, Generator, Union, TYPE_CHECKING
-from clickhouse_connect.driver.query import quote_identifier
+from clickhouse_connect.driver.binding import quote_identifier
from clickhouse_connect.driver.ctypes import data_conv
from clickhouse_connect.driver.context import BaseQueryContext
from clickhouse_connect.driver.options import np, pd, pd_time_test
-from clickhouse_connect.driver.exceptions import ProgrammingError
+from clickhouse_connect.driver.exceptions import ProgrammingError, DataError
if TYPE_CHECKING:
from clickhouse_connect.datatypes.base import ClickHouseType
@@ -198,3 +198,6 @@ class InsertContext(BaseQueryContext):
data[ix] = data[ix].tolist()
self.column_oriented = True
return data
+
+ def data_error(self, error_message: str) -> DataError:
+ return DataError(f"Failed to write column '{self.column_name}': {error_message}")
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py
index acdf9510e0..02bdc03cd5 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py
@@ -142,7 +142,7 @@ def parse_columns(expr: str):
pos += 1
else:
if level == 0:
- if char == ' ':
+ if char in (' ', '='):
if label and not named:
names.append(unescape_identifier(label))
label = ''
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py
index c11e128ec6..54edbeff09 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py
@@ -1,22 +1,18 @@
-import ipaddress
import logging
import re
-import uuid
import pytz
-from enum import Enum
from io import IOBase
from typing import Any, Tuple, Dict, Sequence, Optional, Union, Generator
-from datetime import date, datetime, tzinfo
+from datetime import tzinfo
from pytz.exceptions import UnknownTimeZoneError
-from clickhouse_connect import common
from clickhouse_connect.driver import tzutil
+from clickhouse_connect.driver.binding import bind_query
from clickhouse_connect.driver.common import dict_copy, empty_gen, StreamContext
from clickhouse_connect.driver.external import ExternalData
from clickhouse_connect.driver.types import Matrix, Closable
-from clickhouse_connect.json_impl import any_to_json
from clickhouse_connect.driver.exceptions import StreamClosedError, ProgrammingError
from clickhouse_connect.driver.options import check_arrow, pd_extended_dtypes
from clickhouse_connect.driver.context import BaseQueryContext
@@ -29,7 +25,6 @@ limit_re = re.compile(r'\s+LIMIT($|\s)', re.IGNORECASE)
select_re = re.compile(r'(^|\s)SELECT\s', re.IGNORECASE)
insert_re = re.compile(r'(^|\s)INSERT\s*INTO', re.IGNORECASE)
command_re = re.compile(r'(^\s*)(' + commands + r')\s', re.IGNORECASE)
-external_bind_re = re.compile(r'{.+:.+}')
# pylint: disable=too-many-instance-attributes
@@ -38,7 +33,7 @@ class QueryContext(BaseQueryContext):
Argument/parameter object for queries. This context is used to set thread/query specific formats
"""
- # pylint: disable=duplicate-code,too-many-arguments,too-many-locals
+ # pylint: disable=duplicate-code,too-many-arguments,too-many-positional-arguments,too-many-locals
def __init__(self,
query: Union[str, bytes] = '',
parameters: Optional[Dict[str, Any]] = None,
@@ -176,6 +171,7 @@ class QueryContext(BaseQueryContext):
return None
return active_tz
+ # pylint disable=too-many-positional-arguments
def updated_copy(self,
query: Optional[Union[str, bytes]] = None,
parameters: Optional[Dict[str, Any]] = None,
@@ -343,159 +339,6 @@ class QueryResult(Closable):
self._block_gen = None
-BS = '\\'
-must_escape = (BS, '\'', '`', '\t', '\n')
-
-
-def quote_identifier(identifier: str):
- first_char = identifier[0]
- if first_char in ('`', '"') and identifier[-1] == first_char:
- # Identifier is already quoted, assume that it's valid
- return identifier
- return f'`{escape_str(identifier)}`'
-
-
-def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]],
- server_tz: Optional[tzinfo] = None) -> str:
- while query.endswith(';'):
- query = query[:-1]
- if not parameters:
- return query
- if hasattr(parameters, 'items'):
- return query % {k: format_query_value(v, server_tz) for k, v in parameters.items()}
- return query % tuple(format_query_value(v) for v in parameters)
-
-
-def bind_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]],
- server_tz: Optional[tzinfo] = None) -> Tuple[str, Dict[str, str]]:
- while query.endswith(';'):
- query = query[:-1]
- if not parameters:
- return query, {}
-
- binary_binds = None
- if isinstance(parameters, dict):
- binary_binds = {k: v for k, v in parameters.items() if k.startswith('$') and k.endswith('$') and len(k) > 1}
- for key in binary_binds.keys():
- del parameters[key]
- if external_bind_re.search(query) is None:
- query, bound_params = finalize_query(query, parameters, server_tz), {}
- else:
- bound_params = {f'param_{k}': format_bind_value(v, server_tz) for k, v in parameters.items()}
- if binary_binds:
- binary_query = query.encode()
- binary_indexes = {}
- for k, v in binary_binds.items():
- key = k.encode()
- item_index = 0
- while True:
- item_index = binary_query.find(key, item_index)
- if item_index == -1:
- break
- binary_indexes[item_index + len(key)] = key, v
- item_index += len(key)
- query = b''
- start = 0
- for loc in sorted(binary_indexes.keys()):
- key, value = binary_indexes[loc]
- query += binary_query[start:loc] + value + key
- start = loc
- query += binary_query[start:]
- return query, bound_params
-
-
-def format_str(value: str):
- return f"'{escape_str(value)}'"
-
-
-def escape_str(value: str):
- return ''.join(f'{BS}{c}' if c in must_escape else c for c in value)
-
-
-# pylint: disable=too-many-return-statements
-def format_query_value(value: Any, server_tz: tzinfo = pytz.UTC):
- """
- Format Python values in a ClickHouse query
- :param value: Python object
- :param server_tz: Server timezone for adjusting datetime values
- :return: Literal string for python value
- """
- if value is None:
- return 'NULL'
- if isinstance(value, str):
- return format_str(value)
- if isinstance(value, datetime):
- if value.tzinfo is not None or server_tz != pytz.UTC:
- value = value.astimezone(server_tz)
- return f"'{value.strftime('%Y-%m-%d %H:%M:%S')}'"
- if isinstance(value, date):
- return f"'{value.isoformat()}'"
- if isinstance(value, list):
- return f"[{', '.join(str_query_value(x, server_tz) for x in value)}]"
- if isinstance(value, tuple):
- return f"({', '.join(str_query_value(x, server_tz) for x in value)})"
- if isinstance(value, dict):
- if common.get_setting('dict_parameter_format') == 'json':
- return format_str(any_to_json(value).decode())
- pairs = [str_query_value(k, server_tz) + ':' + str_query_value(v, server_tz)
- for k, v in value.items()]
- return f"{{{', '.join(pairs)}}}"
- if isinstance(value, Enum):
- return format_query_value(value.value, server_tz)
- if isinstance(value, (uuid.UUID, ipaddress.IPv4Address, ipaddress.IPv6Address)):
- return f"'{value}'"
- return value
-
-
-def str_query_value(value: Any, server_tz: tzinfo = pytz.UTC):
- return str(format_query_value(value, server_tz))
-
-
-# pylint: disable=too-many-branches
-def format_bind_value(value: Any, server_tz: tzinfo = pytz.UTC, top_level: bool = True):
- """
- Format Python values in a ClickHouse query
- :param value: Python object
- :param server_tz: Server timezone for adjusting datetime values
- :param top_level: Flag for top level for nested structures
- :return: Literal string for python value
- """
-
- def recurse(x):
- return format_bind_value(x, server_tz, False)
-
- if value is None:
- return '\\N'
- if isinstance(value, str):
- if top_level:
- # At the top levels, strings must not be surrounded by quotes
- return escape_str(value)
- return format_str(value)
- if isinstance(value, datetime):
- value = value.astimezone(server_tz)
- val = value.strftime('%Y-%m-%d %H:%M:%S')
- if top_level:
- return val
- return f"'{val}'"
- if isinstance(value, date):
- if top_level:
- return value.isoformat()
- return f"'{value.isoformat()}'"
- if isinstance(value, list):
- return f"[{', '.join(recurse(x) for x in value)}]"
- if isinstance(value, tuple):
- return f"({', '.join(recurse(x) for x in value)})"
- if isinstance(value, dict):
- if common.get_setting('dict_parameter_format') == 'json':
- return any_to_json(value).decode()
- pairs = [recurse(k) + ':' + recurse(v)
- for k, v in value.items()]
- return f"{{{', '.join(pairs)}}}"
- if isinstance(value, Enum):
- return recurse(value.value)
- return str(value)
-
-
comment_re = re.compile(r"(\".*?\"|\'.*?\')|(/\*.*?\*/|(--\s)[^\n]*$)", re.MULTILINE | re.DOTALL)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py
index 54b4b45949..42480858d6 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py
@@ -2,7 +2,7 @@ from typing import Optional, Sequence, Dict, Any
from clickhouse_connect.driver import Client
from clickhouse_connect.driver.summary import QuerySummary
-from clickhouse_connect.driver.query import quote_identifier
+from clickhouse_connect.driver.binding import quote_identifier
def insert_file(client: Client,
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py
index 03206d0c85..1d20d9c6e7 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py
@@ -96,7 +96,7 @@ class NativeTransform:
col_enc = col_name.encode()
write_leb128(len(col_enc), output)
output += col_enc
- col_enc = col_type.name.encode()
+ col_enc = col_type.insert_name.encode()
write_leb128(len(col_enc), output)
output += col_enc
context.start_column(col_name)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/tools/testing.py b/contrib/python/clickhouse-connect/clickhouse_connect/tools/testing.py
index 7084c71a40..56ced72501 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/tools/testing.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/tools/testing.py
@@ -1,7 +1,7 @@
from typing import Sequence, Optional, Union, Dict, Any
from clickhouse_connect.driver import Client
-from clickhouse_connect.driver.query import quote_identifier, str_query_value
+from clickhouse_connect.driver.binding import quote_identifier, str_query_value
class TableContext: