diff options
author | robot-piglet <[email protected]> | 2025-08-01 00:01:09 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-08-01 00:11:46 +0300 |
commit | 75fd1fc757cc04e434a65784ae4ba6e28350878d (patch) | |
tree | def4a4c6e8a93c0f37b563a6bb86bc7936fc3912 /contrib/python | |
parent | f5d4ccd1e8d8054636ee31f953767a529801fcbf (diff) |
Intermediate changes
commit_hash:11a36b37f1d393ab351897e8a0b5bf4de5871fe0
Diffstat (limited to 'contrib/python')
25 files changed, 516 insertions, 264 deletions
diff --git a/contrib/python/clickhouse-connect/.dist-info/METADATA b/contrib/python/clickhouse-connect/.dist-info/METADATA index a8378b82544..39a36b2075e 100644 --- a/contrib/python/clickhouse-connect/.dist-info/METADATA +++ b/contrib/python/clickhouse-connect/.dist-info/METADATA @@ -1,6 +1,6 @@ -Metadata-Version: 2.1 +Metadata-Version: 2.4 Name: clickhouse-connect -Version: 0.8.10 +Version: 0.8.18 Summary: ClickHouse Database Core Driver for Python, Pandas, and Superset Home-page: https://github.com/ClickHouse/clickhouse-connect Author: ClickHouse Inc. @@ -36,6 +36,19 @@ Provides-Extra: orjson Requires-Dist: orjson; extra == "orjson" Provides-Extra: tzlocal Requires-Dist: tzlocal>=4.0; extra == "tzlocal" +Dynamic: author +Dynamic: author-email +Dynamic: classifier +Dynamic: description +Dynamic: description-content-type +Dynamic: home-page +Dynamic: keywords +Dynamic: license +Dynamic: license-file +Dynamic: provides-extra +Dynamic: requires-dist +Dynamic: requires-python +Dynamic: summary ## ClickHouse Connect @@ -81,4 +94,4 @@ See the [run_async example](./examples/run_async.py) for more details. ### Complete Documentation The documentation for ClickHouse Connect has moved to -[ClickHouse Docs](https://clickhouse.com/docs/en/integrations/language-clients/python/intro) +[ClickHouse Docs](https://clickhouse.com/docs/integrations/python) diff --git a/contrib/python/clickhouse-connect/README.md b/contrib/python/clickhouse-connect/README.md index 58defa499d0..83fbbf583ac 100644 --- a/contrib/python/clickhouse-connect/README.md +++ b/contrib/python/clickhouse-connect/README.md @@ -42,4 +42,4 @@ See the [run_async example](./examples/run_async.py) for more details. ### Complete Documentation The documentation for ClickHouse Connect has moved to -[ClickHouse Docs](https://clickhouse.com/docs/en/integrations/language-clients/python/intro) +[ClickHouse Docs](https://clickhouse.com/docs/integrations/python) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py index c0bfba807c0..07f89fe72be 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py @@ -1 +1 @@ -version = '0.8.10' +version = '0.8.18' diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py index c9bb24c95a6..0c1d7d79fe2 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py @@ -1,4 +1,5 @@ +from sqlalchemy import text from sqlalchemy.engine.default import DefaultDialect from clickhouse_connect import dbapi @@ -46,8 +47,8 @@ class ClickHouseDialect(DefaultDialect): @staticmethod def has_database(connection, db_name): - return (connection.execute('SELECT name FROM system.databases ' + - f'WHERE name = {format_str(db_name)}')).rowcount > 0 + return (connection.execute(text('SELECT name FROM system.databases ' + + f'WHERE name = {format_str(db_name)}'))).rowcount > 0 def get_table_names(self, connection, schema=None, **kw): cmd = 'SHOW TABLES' @@ -87,7 +88,7 @@ class ClickHouseDialect(DefaultDialect): return [] def has_table(self, connection, table_name, schema=None, **_kw): - result = connection.execute(f'EXISTS TABLE {full_table(table_name, schema)}') + result = connection.execute(text(f'EXISTS TABLE {full_table(table_name, schema)}')) row = result.fetchone() return row[0] == 1 diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/common.py b/contrib/python/clickhouse-connect/clickhouse_connect/common.py index ab960e76b71..dd0f319dce9 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/common.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/common.py @@ -41,8 +41,9 @@ def build_client_name(client_name: str): os_user = f'; os_user:{getpass.getuser()}' except Exception: # pylint: disable=broad-except pass - return (f'{client_name}{product_name}clickhouse-connect/{version()}' + + full_name = (f'{client_name}{product_name}clickhouse-connect/{version()}' + f' (lv:py/{py_version}; mode:sync; os:{sys.platform}{os_user})') + return full_name.encode('ascii', 'ignore').decode() def get_setting(name: str): diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py index ab43b4e1dbb..c41694b71a7 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py @@ -126,16 +126,19 @@ class ClickHouseType(ABC): if self.low_card: write_uint64(low_card_version, dest) - def read_column_prefix(self, source: ByteSource, _ctx: QueryContext): + def read_column_prefix(self, source: ByteSource, _ctx: QueryContext) -> Any: """ Read the low cardinality version. Like the write method, this has to happen immediately for container classes :param source: The native protocol binary read buffer - :return: updated read pointer + :param _ctx: The current query context + :return: any state data required by the read_column_data method """ if self.low_card: v = source.read_uint64() if v != low_card_version: logger.warning('Unexpected low cardinality version %d reading type %s', v, self.name) + return v + return None def read_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: """ @@ -144,30 +147,31 @@ class ClickHouseType(ABC): :param source: Native protocol binary read buffer :param num_rows: Number of rows expected in the column :param ctx: QueryContext for query specific settings - :return: The decoded column data as a sequence and the updated location pointer + :return: The decoded column data as a sequence """ - self.read_column_prefix(source, ctx) - return self.read_column_data(source, num_rows, ctx) + read_state = self.read_column_prefix(source, ctx) + return self.read_column_data(source, num_rows, ctx, read_state) - def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: Any) -> Sequence: """ Public read method for all ClickHouseType data type columns :param source: Native protocol binary read buffer :param num_rows: Number of rows expected in the column :param ctx: QueryContext for query specific settings - :return: The decoded column plus the updated location pointer + :param read_state: Any information returned by the read_column_prefix method + :return: The decoded column """ if self.low_card: - column = self._read_low_card_column(source, num_rows, ctx) + column = self._read_low_card_column(source, num_rows, ctx, read_state) elif self.nullable: - column = self._read_nullable_column(source, num_rows, ctx) + column = self._read_nullable_column(source, num_rows, ctx, read_state) else: - column = self._read_column_binary(source, num_rows, ctx) + column = self._read_column_binary(source, num_rows, ctx, read_state) return self._finalize_column(column, ctx) - def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: + def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: Any) -> Sequence: null_map = source.read_bytes(num_rows) - column = self._read_column_binary(source, num_rows, ctx) + column = self._read_column_binary(source, num_rows, ctx, read_state) null_obj = self._active_null(ctx) return data_conv.build_nullable_column(column, null_map, null_obj) @@ -177,7 +181,8 @@ class ClickHouseType(ABC): # pylint: disable=no-self-use def _read_column_binary(self, _source: ByteSource, - _num_rows: int, _ctx: QueryContext) -> Union[Sequence, MutableSequence]: + _num_rows: int, _ctx: QueryContext, + _read_state: Any) -> Union[Sequence, MutableSequence]: """ Lowest level read method for ClickHouseType native data columns :param _source: Native protocol binary read buffer @@ -224,13 +229,13 @@ class ClickHouseType(ABC): self._write_column_binary(column, dest, ctx) # pylint: disable=no-member - def _read_low_card_column(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_low_card_column(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: Any): if num_rows == 0: return [] key_data = source.read_uint64() key_sz = 2 ** (key_data & 0xff) index_cnt = source.read_uint64() - index = self._read_column_binary(source, index_cnt, ctx) + index = self._read_column_binary(source, index_cnt, ctx, read_state) key_cnt = source.read_uint64() keys = source.read_array(array_type(key_sz, False), key_cnt) if self.nullable: @@ -313,12 +318,12 @@ class ArrayType(ClickHouseType, ABC, registered=False): cls._struct_type = '<' + cls._array_type cls.byte_size = array.array(cls._array_type).itemsize - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any): if ctx.use_numpy: return numpy_conv.read_numpy_array(source, self.np_type, num_rows) return source.read_array(self._array_type, num_rows) - def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: + def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any) -> Sequence: return data_conv.read_nullable_array(source, self._array_type, num_rows, self._active_null(ctx)) def _build_lc_column(self, index: Sequence, keys: array.array, ctx: QueryContext): @@ -357,7 +362,7 @@ class UnsupportedType(ClickHouseType, ABC, registered=False): super().__init__(type_def) self._name_suffix = type_def.arg_str - def _read_column_binary(self, source: Sequence, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: Sequence, num_rows: int, ctx: QueryContext, read_state: Any): raise NotSupportedError(f'{self.name} deserialization not supported') def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py index c36577127c9..5908c75765b 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py @@ -1,6 +1,6 @@ import array import logging -from typing import Sequence, Collection +from typing import Sequence, Collection, Any from clickhouse_connect.driver.insert import InsertContext from clickhouse_connect.driver.query import QueryContext @@ -15,15 +15,20 @@ logger = logging.getLogger(__name__) class Array(ClickHouseType): - __slots__ = ('element_type',) + __slots__ = ('element_type', '_insert_name') python_type = list + @property + def insert_name(self): + return self._insert_name + def __init__(self, type_def: TypeDef): super().__init__(type_def) self.element_type = get_from_name(type_def.values[0]) self._name_suffix = f'({self.element_type.name})' + self._insert_name = f'Array({self.element_type.insert_name})' - def read_column_prefix(self, source: ByteSource, ctx:QueryContext): + def read_column_prefix(self, source: ByteSource, ctx: QueryContext): return self.element_type.read_column_prefix(source, ctx) def _data_size(self, sample: Sequence) -> int: @@ -35,7 +40,7 @@ class Array(ClickHouseType): return total // len(sample) + 8 # pylint: disable=too-many-locals - def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: Any): final_type = self.element_type depth = 1 while isinstance(final_type, Array): @@ -48,7 +53,7 @@ class Array(ClickHouseType): offset_sizes.append(level_offsets) level_size = level_offsets[-1] if level_offsets else 0 if level_size: - all_values = final_type.read_column_data(source, level_size, ctx) + all_values = final_type.read_column_data(source, level_size, ctx, read_state) else: all_values = [] column = all_values if isinstance(all_values, list) else list(all_values) @@ -86,10 +91,14 @@ class Array(ClickHouseType): class Tuple(ClickHouseType): - _slots = 'element_names', 'element_types' + _slots = 'element_names', 'element_types', '_insert_name' python_type = tuple valid_formats = 'tuple', 'dict', 'json', 'native' # native is 'tuple' for unnamed tuples, and dict for named tuples + @property + def insert_name(self): + return self._insert_name + def __init__(self, type_def: TypeDef): super().__init__(type_def) self.element_names = type_def.keys @@ -98,6 +107,11 @@ class Tuple(ClickHouseType): self._name_suffix = f"({', '.join(quote_identifier(k) + ' ' + str(v) for k, v in zip(type_def.keys, type_def.values))})" else: self._name_suffix = type_def.arg_str + if self.element_names: + self._insert_name = \ + f"Tuple({', '.join(quote_identifier(k) + ' ' + v.insert_name for k, v in zip(type_def.keys, self.element_types))})" + else: + self._insert_name = f"Tuple({', '.join(v.insert_name for v in self.element_types)})" def _data_size(self, sample: Collection) -> int: if len(sample) == 0: @@ -114,14 +128,13 @@ class Tuple(ClickHouseType): return elem_size def read_column_prefix(self, source: ByteSource, ctx: QueryContext): - for e_type in self.element_types: - e_type.read_column_prefix(source, ctx) + return [e_type.read_column_prefix(source, ctx) for e_type in self.element_types] - def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: Any): columns = [] e_names = self.element_names - for e_type in self.element_types: - column = e_type.read_column_data(source, num_rows, ctx) + for ix, e_type in enumerate(self.element_types): + column = e_type.read_column_data(source, num_rows, ctx, read_state[ix]) columns.append(column) if e_names and self.read_format(ctx) != 'tuple': dicts = [{} for _ in range(num_rows)] @@ -156,14 +169,19 @@ class Tuple(ClickHouseType): class Map(ClickHouseType): - _slots = 'key_type', 'value_type' + _slots = 'key_type', 'value_type', '_insert_name' python_type = dict + @property + def insert_name(self): + return self._insert_name + def __init__(self, type_def: TypeDef): super().__init__(type_def) self.key_type = get_from_name(type_def.values[0]) self.value_type = get_from_name(type_def.values[1]) self._name_suffix = type_def.arg_str + self._insert_name = f'Map({self.key_type.insert_name}, {self.value_type.insert_name})' def _data_size(self, sample: Collection) -> int: total = 0 @@ -175,15 +193,16 @@ class Map(ClickHouseType): return total // len(sample) def read_column_prefix(self, source: ByteSource, ctx: QueryContext): - self.key_type.read_column_prefix(source, ctx) - self.value_type.read_column_prefix(source, ctx) + key_state = self.key_type.read_column_prefix(source, ctx) + value_state = self.value_type.read_column_prefix(source, ctx) + return key_state, value_state # pylint: disable=too-many-locals - def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: Any): offsets = source.read_array('Q', num_rows) total_rows = 0 if len(offsets) == 0 else offsets[-1] - keys = self.key_type.read_column_data(source, total_rows, ctx) - values = self.value_type.read_column_data(source, total_rows, ctx) + keys = self.key_type.read_column_data(source, total_rows, ctx, read_state[0]) + values = self.value_type.read_column_data(source, total_rows, ctx, read_state[1]) all_pairs = tuple(zip(keys, values)) column = [] app = column.append @@ -231,12 +250,12 @@ class Nested(ClickHouseType): array_sample = [[tuple(sub_row[key] for key in keys) for sub_row in row] for row in sample] return self.tuple_array.data_size(array_sample) - def read_column_prefix(self, source: ByteSource, ctx:QueryContext): - self.tuple_array.read_column_prefix(source, ctx) + def read_column_prefix(self, source: ByteSource, ctx: QueryContext): + return self.tuple_array.read_column_prefix(source, ctx) - def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: Any): keys = self.element_names - data = self.tuple_array.read_column_data(source, num_rows, ctx) + data = self.tuple_array.read_column_data(source, num_rows, ctx, read_state) return [[dict(zip(keys, x)) for x in row] for row in data] def write_column_prefix(self, dest: bytearray): diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/dynamic.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/dynamic.py index 6222400154c..c5057b2b1b1 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/dynamic.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/dynamic.py @@ -1,4 +1,5 @@ -from typing import List, Sequence, Collection +from collections import namedtuple +from typing import List, Sequence, Collection, Any from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef from clickhouse_connect.datatypes.registry import get_from_name @@ -16,31 +17,40 @@ STRING_DATA_TYPE: ClickHouseType json_serialization_format = 0x1 +VariantState = namedtuple('VariantState', 'discriminator_node element_states') + + class Variant(ClickHouseType): _slots = 'element_types' python_type = object def __init__(self, type_def: TypeDef): super().__init__(type_def) - self.element_types:List[ClickHouseType] = [get_from_name(name) for name in type_def.values] + self.element_types: List[ClickHouseType] = [get_from_name(name) for name in type_def.values] self._name_suffix = f"({', '.join(ch_type.name for ch_type in self.element_types)})" @property def insert_name(self): return 'String' - def read_column_prefix(self, source: ByteSource, ctx:QueryContext): - if source.read_uint64() != 0: - raise DataError(f'Unexpected discriminator format in Variant column {ctx.column_name}') + def read_column_prefix(self, source: ByteSource, ctx: QueryContext) -> VariantState: + discriminator_mode = source.read_uint64() + element_states = [e_type.read_column_prefix(source, ctx) for e_type in self.element_types] + return VariantState(discriminator_mode, element_states) - def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: - return read_variant_column(self.element_types, source, num_rows, ctx) + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, + read_state: VariantState) -> Sequence: + return read_variant_column(source, num_rows, ctx, self.element_types, read_state.element_states) def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext): write_str_values(self, column, dest, ctx) -def read_variant_column(variant_types: List[ClickHouseType], source: ByteSource, num_rows:int, ctx: QueryContext) -> Sequence: +def read_variant_column(source: ByteSource, + num_rows: int, + ctx: QueryContext, + variant_types: List[ClickHouseType], + element_states: List[Any]) -> Sequence: v_count = len(variant_types) discriminators = source.read_array('B', num_rows) # We have to count up how many of each discriminator there are in the block to read the sub columns correctly @@ -52,7 +62,7 @@ def read_variant_column(variant_types: List[ClickHouseType], source: ByteSource, # Read all the sub-columns for ix in range(v_count): if disc_rows[ix] > 0: - sub_columns[ix] = variant_types[ix].read_column_data(source, disc_rows[ix], ctx) + sub_columns[ix] = variant_types[ix].read_column_data(source, disc_rows[ix], ctx, element_states[ix]) # Now we have to walk through each of the discriminators again to assign the correct value from # the sub-column to the final result column sub_indexes = [0] * v_count @@ -67,40 +77,45 @@ def read_variant_column(variant_types: List[ClickHouseType], source: ByteSource, return col +DynamicState = namedtuple('DynamicState', 'struct_version variant_types variant_states') + + +def read_dynamic_prefix(_, source: ByteSource, ctx: QueryContext) -> DynamicState: + struct_version = source.read_uint64() + if struct_version == 1: + source.read_leb128() # max dynamic types, we ignore this value + elif struct_version != 2: + raise DataError('Unrecognized dynamic structure version') + num_variants = source.read_leb128() + variant_types = [get_from_name(source.read_leb128_str()) for _ in range(num_variants)] + variant_types.append(STRING_DATA_TYPE) + if source.read_uint64() != 0: # discriminator format, currently only 0 is recognized + raise DataError('Unexpected discriminator format in Variant column prefix') + variant_states = [e_type.read_column_prefix(source, ctx) for e_type in variant_types] + return DynamicState(struct_version, variant_types, variant_states) + + class Dynamic(ClickHouseType): python_type = object + read_column_prefix = read_dynamic_prefix @property def insert_name(self): return 'String' - def __init__(self, type_def:TypeDef): + def __init__(self, type_def: TypeDef): super().__init__(type_def) if type_def.keys and type_def.keys[0] == 'max_types': self._name_suffix = f'(max_types={type_def.values[0]})' - def read_column(self, source: ByteSource, num_rows:int, ctx:QueryContext): - variant_types = read_dynamic_prefix(source) - return read_variant_column(variant_types, source, num_rows, ctx) + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, + read_state: DynamicState) -> Sequence: + return read_variant_column(source, num_rows, ctx, read_state.variant_types, read_state.variant_states) def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext): write_str_values(self, column, dest, ctx) -def read_dynamic_prefix(source: ByteSource) -> List[ClickHouseType]: - serialize_version = source.read_uint64() - if serialize_version == 1: - source.read_leb128() # max dynamic types, we ignore this value - elif serialize_version != 2: - raise DataError('Unrecognized dynamic structure version') - num_variants = source.read_leb128() - variant_types = [get_from_name(source.read_leb128_str()) for _ in range(num_variants)] - variant_types.append(STRING_DATA_TYPE) - if source.read_uint64() != 0: # discriminator format, currently only 0 is recognized - raise DataError('Unexpected discriminator format in Variant column prefix') - return variant_types - - def json_sample_size(_, sample: Collection) -> int: if len(sample) == 0: return 0 @@ -113,7 +128,7 @@ def json_sample_size(_, sample: Collection) -> int: return total // len(sample) + 1 -def write_json(ch_type:ClickHouseType, column: Sequence, dest: bytearray, ctx: InsertContext): +def write_json(ch_type: ClickHouseType, column: Sequence, dest: bytearray, ctx: InsertContext): first = first_value(column, ch_type.nullable) write_col = column encoding = ctx.encoding or ch_type.encoding @@ -124,7 +139,7 @@ def write_json(ch_type:ClickHouseType, column: Sequence, dest: bytearray, ctx: I handle_error(data_conv.write_str_col(write_col, ch_type.nullable, encoding, dest), ctx) -def write_str_values(ch_type:ClickHouseType, column: Sequence, dest: bytearray, ctx: InsertContext): +def write_str_values(ch_type: ClickHouseType, column: Sequence, dest: bytearray, ctx: InsertContext): encoding = ctx.encoding or ch_type.encoding col = [''] * len(column) for ix, v in enumerate(column): @@ -135,6 +150,9 @@ def write_str_values(ch_type:ClickHouseType, column: Sequence, dest: bytearray, handle_error(data_conv.write_str_col(col, False, encoding, dest), ctx) +JSONState = namedtuple('JSONState', 'serialize_version dynamic_paths typed_states dynamic_states') + + class JSON(ClickHouseType): _slots = 'typed_paths', 'typed_types' python_type = dict @@ -148,7 +166,7 @@ class JSON(ClickHouseType): typed_types = [] skips = [] - def __init__(self, type_def:TypeDef): + def __init__(self, type_def: TypeDef): super().__init__(type_def) typed_paths = [] typed_types = [] @@ -200,25 +218,26 @@ class JSON(ClickHouseType): if json_serialization_format > 0: write_uint64(json_serialization_format, dest) - def read_column_prefix(self, source: ByteSource, ctx: QueryContext): + def read_column_prefix(self, source: ByteSource, ctx: QueryContext) -> JSONState: serialize_version = source.read_uint64() if serialize_version == 0: source.read_leb128() # max dynamic types, we ignore this value elif serialize_version != 2: - raise DataError(f'Unrecognized dynamic structure version: {serialize_version} column: `{ctx.column_name}`') - - # pylint: disable=too-many-locals - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + raise DataError(f'Unrecognized json structure version: {serialize_version} column: `{ctx.column_name}`') dynamic_path_cnt = source.read_leb128() dynamic_paths = [source.read_leb128_str() for _ in range(dynamic_path_cnt)] - for typed in self.typed_types: - typed.read_column_prefix(source, ctx) - dynamic_variants = [read_dynamic_prefix(source) for _ in range(dynamic_path_cnt)] - # C++ prefix read ends here - - typed_columns = [ch_type.read_column_data(source, num_rows, ctx) for ch_type in self.typed_types] - dynamic_columns = [read_variant_column(dynamic_variants[ix], source, num_rows, ctx) for ix in range(dynamic_path_cnt)] - SHARED_DATA_TYPE.read_column_data(source, num_rows, ctx) + typed_states = [typed.read_column_prefix(source, ctx) for typed in self.typed_types] + dynamic_states = [read_dynamic_prefix(self, source, ctx) for _ in range(dynamic_path_cnt)] + return JSONState(serialize_version, dynamic_paths, typed_states, dynamic_states) + + # pylint: disable=too-many-locals + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: JSONState): + typed_columns = [ch_type.read_column_data(source, num_rows, ctx, read_state) + for ch_type, read_state in zip(self.typed_types, read_state.typed_states)] + dynamic_columns = [ + read_variant_column(source, num_rows, ctx, dynamic_state.variant_types, dynamic_state.variant_states) + for dynamic_state in read_state.dynamic_states] + SHARED_DATA_TYPE.read_column_data(source, num_rows, ctx, None) col = [] for row_num in range(num_rows): top = {} @@ -233,7 +252,7 @@ class JSON(ClickHouseType): item[key] = child item = child item[chain[-1]] = value - for ix, field in enumerate(dynamic_paths): + for ix, field in enumerate(read_state.dynamic_paths): value = dynamic_columns[ix][row_num] if value is None: continue diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/geometric.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/geometric.py index 67b342b895b..04c94eba561 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/geometric.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/geometric.py @@ -1,4 +1,4 @@ -from typing import Sequence +from typing import Sequence, Any from clickhouse_connect.datatypes.base import ClickHouseType from clickhouse_connect.driver.insert import InsertContext @@ -15,32 +15,44 @@ class Point(ClickHouseType): def write_column(self, column: Sequence, dest: bytearray, ctx: InsertContext): return POINT_DATA_TYPE.write_column(column, dest, ctx) - def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: - return POINT_DATA_TYPE.read_column_data(source, num_rows, ctx) + def read_column_prefix(self, source: ByteSource, ctx: QueryContext): + return POINT_DATA_TYPE.read_column_prefix(source, ctx) + + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: Any) -> Sequence: + return POINT_DATA_TYPE.read_column_data(source, num_rows, ctx, read_state) class Ring(ClickHouseType): def write_column(self, column: Sequence, dest: bytearray, ctx: InsertContext): return RING_DATA_TYPE.write_column(column, dest, ctx) - def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: - return RING_DATA_TYPE.read_column_data(source, num_rows, ctx) + def read_column_prefix(self, source: ByteSource, ctx: QueryContext): + return RING_DATA_TYPE.read_column_prefix(source, ctx) + + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state) -> Sequence: + return RING_DATA_TYPE.read_column_data(source, num_rows, ctx, read_state) class Polygon(ClickHouseType): def write_column(self, column: Sequence, dest: bytearray, ctx: InsertContext): return POLYGON_DATA_TYPE.write_column(column, dest, ctx) - def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: - return POLYGON_DATA_TYPE.read_column_data(source, num_rows, ctx) + def read_column_prefix(self, source: ByteSource, ctx: QueryContext): + return POLYGON_DATA_TYPE.read_column_prefix(source, ctx) + + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state:Any) -> Sequence: + return POLYGON_DATA_TYPE.read_column_data(source, num_rows, ctx, read_state) class MultiPolygon(ClickHouseType): def write_column(self, column: Sequence, dest: bytearray, ctx: InsertContext): return MULTI_POLYGON_DATA_TYPE.write_column(column, dest, ctx) - def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: - return MULTI_POLYGON_DATA_TYPE.read_column_data(source, num_rows, ctx) + def read_column_prefix(self, source: ByteSource, ctx: QueryContext): + return MULTI_POLYGON_DATA_TYPE.read_column_prefix(source, ctx) + + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state:Any) -> Sequence: + return MULTI_POLYGON_DATA_TYPE.read_column_data(source, num_rows, ctx, read_state) class LineString(Ring): diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py index f44686367d3..064b612feb2 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py @@ -1,6 +1,6 @@ import socket from ipaddress import IPv4Address, IPv6Address -from typing import Union, MutableSequence, Sequence +from typing import Union, MutableSequence, Sequence, Any from clickhouse_connect.datatypes.base import ClickHouseType from clickhouse_connect.driver.common import write_array, int_size, first_value @@ -20,7 +20,7 @@ class IPv4(ClickHouseType): python_type = IPv4Address byte_size = 4 - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any): if self.read_format(ctx) == 'int': return source.read_array(self._array_type, num_rows) if self.read_format(ctx) == 'string': @@ -58,7 +58,7 @@ class IPv6(ClickHouseType): python_type = IPv6Address byte_size = 16 - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any): if self.read_format(ctx) == 'string': return self._read_binary_str(source, num_rows) return self._read_binary_ip(source, num_rows) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py index a9629dd3e0d..7e86b623e3f 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py @@ -1,5 +1,5 @@ import decimal -from typing import Union, Type, Sequence, MutableSequence +from typing import Union, Type, Sequence, MutableSequence, Any from math import nan, isnan, isinf @@ -72,7 +72,7 @@ class UInt64(IntBase): np_type = '<u8' python_type = int - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any): fmt = self.read_format(ctx) if ctx.use_numpy: np_type = '<q' if fmt == 'signed' else '<u8' @@ -80,7 +80,7 @@ class UInt64(IntBase): arr_type = 'q' if fmt == 'signed' else 'Q' return source.read_array(arr_type, num_rows) - def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: + def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any) -> Sequence: return data_conv.read_nullable_array(source, 'q' if self.read_format(ctx) == 'signed' else 'Q', num_rows, self._active_null(ctx)) @@ -98,8 +98,9 @@ class UInt64(IntBase): class BigInt(ClickHouseType, registered=False): _signed = True valid_formats = 'string', 'native' + python_type = int - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any): signed = self._signed sz = self.byte_size column = [] @@ -212,7 +213,7 @@ class Bool(ClickHouseType): python_type = bool byte_size = 1 - def _read_column_binary(self, source: ByteSource, num_rows: int, _ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, _ctx: QueryContext, _read_state: Any): column = source.read_bytes(num_rows) return [b != 0 for b in column] @@ -243,7 +244,7 @@ class Enum(ClickHouseType): val_str = ', '.join(f"'{key}' = {value}" for key, value in zip(escaped_keys, type_def.values)) self._name_suffix = f'({val_str})' - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any): column = source.read_array(self._array_type, num_rows) if self.read_format(ctx) == 'int': return column @@ -299,7 +300,7 @@ class Decimal(ClickHouseType): self._name_suffix = f'({prec}, {scale})' self._array_type = array_type(self.byte_size, True) - def _read_column_binary(self, source: ByteSource, num_rows: int, _ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, _ctx: QueryContext, _read_state: Any): column = source.read_array(self._array_type, num_rows) dec = decimal.Decimal scale = self.scale @@ -336,7 +337,7 @@ class Decimal(ClickHouseType): class BigDecimal(Decimal, registered=False): - def _read_column_binary(self, source: ByteSource, num_rows: int, _ctx): + def _read_column_binary(self, source: ByteSource, num_rows: int, _ctx: QueryContext, _read_state: Any): dec = decimal.Decimal scale = self.scale prec = self.prec diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py index 9d296d00fc4..3443f09d1f6 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py @@ -1,4 +1,4 @@ -from typing import Union, Sequence, MutableSequence +from typing import Union, Sequence, MutableSequence, Any from uuid import UUID as PYUUID from clickhouse_connect.datatypes.base import TypeDef, ClickHouseType, ArrayType, UnsupportedType @@ -20,7 +20,7 @@ class UUID(ClickHouseType): def python_null(self, ctx): return '' if self.read_format(ctx) == 'string' else PYUUID(int=0) - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any): if self.read_format(ctx) == 'string': return self._read_binary_str(source, num_rows) return data_conv.read_uuid_col(source, num_rows) @@ -89,6 +89,9 @@ class SimpleAggregateFunction(ClickHouseType): self.element_type: ClickHouseType = get_from_name(type_def.values[1]) self._name_suffix = type_def.arg_str self.byte_size = self.element_type.byte_size + self.np_type = self.element_type.np_type + self.python_type = self.element_type.python_type + self.nano_divisor = self.element_type.nano_divisor def _data_size(self, sample: Sequence) -> int: return self.element_type.data_size(sample) @@ -99,8 +102,8 @@ class SimpleAggregateFunction(ClickHouseType): def write_column_prefix(self, dest: bytearray): self.element_type.write_column_prefix(dest) - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): - return self.element_type.read_column_data(source, num_rows, ctx) + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: Any): + return self.element_type.read_column_data(source, num_rows, ctx, read_state) def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): self.element_type.write_column_data(column, dest, ctx) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py index 4b7886510d1..bf322ffa120 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py @@ -1,4 +1,4 @@ -from typing import Sequence, MutableSequence, Union, Collection +from typing import Sequence, MutableSequence, Union, Collection, Any from clickhouse_connect.driver.common import first_value from clickhouse_connect.driver.ctypes import data_conv @@ -30,10 +30,10 @@ class String(ClickHouseType): total += len(x) return total // len(sample) + 1 - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any): return source.read_str_col(num_rows, self._active_encoding(ctx)) - def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: + def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: Any) -> Sequence: return source.read_str_col(num_rows, self._active_encoding(ctx), True, self._active_null(ctx)) def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence: @@ -75,7 +75,7 @@ class FixedString(ClickHouseType): def np_type(self): return f'<U{self.byte_size}' - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any): if self.read_format(ctx) == 'string': return source.read_fixed_str_col(self.byte_size, num_rows, ctx.encoding or self.encoding ) return source.read_bytes_col(self.byte_size, num_rows) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py index 39bf0c070da..4b416a9159f 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py @@ -1,7 +1,7 @@ import pytz from datetime import date, datetime, tzinfo -from typing import Union, Sequence, MutableSequence +from typing import Union, Sequence, MutableSequence, Any from clickhouse_connect.datatypes.base import TypeDef, ClickHouseType from clickhouse_connect.driver.common import write_array, np_date_types, int_size, first_value @@ -24,7 +24,7 @@ class Date(ClickHouseType): python_type = date byte_size = 2 - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state:Any): if self.read_format(ctx) == 'int': return source.read_array(self._array_type, num_rows) if ctx.use_numpy: @@ -71,7 +71,7 @@ class Date32(Date): byte_size = 4 _array_type = 'l' if int_size == 2 else 'i' - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any): if ctx.use_numpy: return numpy_conv.read_numpy_array(source, '<i4', num_rows).astype(self.np_type) if self.read_format(ctx) == 'int': @@ -111,7 +111,7 @@ class DateTime(DateTimeBase): else: self.tzinfo = None - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any) -> Sequence: if self.read_format(ctx) == 'int': return source.read_array(self._array_type, num_rows) active_tz = ctx.active_tz(self.tzinfo) @@ -161,7 +161,7 @@ class DateTime64(DateTimeBase): def nano_divisor(self): return 1000000000 // self.prec - def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any) -> Sequence: if self.read_format(ctx) == 'int': return source.read_array('q', num_rows) active_tz = ctx.active_tz(self.tzinfo) @@ -202,6 +202,18 @@ class DateTime64(DateTimeBase): if isinstance(first, int) or self.write_format(ctx) == 'int': if self.nullable: column = [x if x else 0 for x in column] + elif isinstance(first, str): + original_column = column + column = [] + + for x in original_column: + if not x and self.nullable: + v = 0 + else: + dt = datetime.fromisoformat(x) + v = ((int(dt.timestamp()) * 1000000 + dt.microsecond) * self.prec) // 1000000 + + column.append(v) else: prec = self.prec if self.nullable: diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/cursor.py b/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/cursor.py index 64eabcba347..9388b131485 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/cursor.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/cursor.py @@ -57,6 +57,10 @@ class Cursor: self.data = query_result.result_set self._rowcount = len(self.data) self._summary.append(query_result.summary) + + # Need to reset cursor _ix after performing an execute + self._ix = 0 + if query_result.column_names: self.names = query_result.column_names self.types = [x.name for x in query_result.column_types] @@ -106,9 +110,12 @@ class Cursor: raise ProgrammingError(f'Invalid parameters {parameters} passed to cursor executemany') from ex self._rowcount = len(self.data) + # Need to reset cursor _ix after performing an execute + self._ix = 0 + def fetchall(self): self.check_valid() - ret = self.data + ret = self.data[self._ix:] self._ix = self._rowcount return ret @@ -122,7 +129,15 @@ class Cursor: def fetchmany(self, size: int = -1): self.check_valid() - end = self._ix + max(size, self._rowcount - self._ix) + + if size < 0: + # Fetch all remaining rows + size = self._rowcount - self._ix + elif size == 0: + # Return empty list for size=0 + return [] + + end = min(self._ix + size, self._rowcount) ret = self.data[self._ix: end] self._ix = end return ret diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py index 58efb5f8a83..644bac7b96a 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py @@ -13,9 +13,10 @@ from clickhouse_connect.driver.asyncclient import AsyncClient # pylint: disable=too-many-arguments,too-many-locals,too-many-branches def create_client(*, - host: str = None, - username: str = None, + host: Optional[str] = None, + username: Optional[str] = None, password: str = '', + access_token: Optional[str] = None, database: str = '__default__', interface: Optional[str] = None, port: int = 0, @@ -29,7 +30,11 @@ def create_client(*, :param host: The hostname or IP address of the ClickHouse server. If not set, localhost will be used. :param username: The ClickHouse username. If not set, the default ClickHouse user will be used. + Should not be set if `access_token` is used. :param password: The password for username. + Should not be set if `access_token` is used. + :param access_token: JWT access token (ClickHouse Cloud feature). + Should not be set if `username`/`password` are used. :param database: The default database for the connection. If not set, ClickHouse Connect will use the default database for username. :param interface: Must be http or https. Defaults to http, or to https if port is set to 8443 or 443 @@ -90,6 +95,8 @@ def create_client(*, if not interface: interface = 'https' if use_tls else 'http' port = port or default_port(interface, use_tls) + if access_token and (username or password != ''): + raise ProgrammingError('Cannot use both access_token and username/password') if username is None and 'user' in kwargs: username = kwargs.pop('user') if username is None and 'user_name' in kwargs: @@ -112,7 +119,8 @@ def create_client(*, if name.startswith('ch_'): name = name[3:] settings[name] = value - return HttpClient(interface, host, port, username, password, database, settings=settings, **kwargs) + return HttpClient(interface, host, port, username, password, database, access_token, + settings=settings, **kwargs) raise ProgrammingError(f'Unrecognized client type {interface}') diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py index b63a14f7761..4bb9f080948 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py @@ -30,7 +30,6 @@ class AsyncClient: executor_threads = min(32, (os.cpu_count() or 1) + 4) # Mimic the default behavior self.executor = ThreadPoolExecutor(max_workers=executor_threads) - def set_client_setting(self, key, value): """ Set a clickhouse setting for the client after initialization. If a setting is not recognized by ClickHouse, @@ -48,6 +47,13 @@ class AsyncClient: """ return self.client.get_client_setting(key=key) + def set_access_token(self, access_token: str): + """ + Set the ClickHouse access token for the client + :param access_token: Access token string + """ + self.client.set_access_token(access_token) + def min_version(self, version_str: str) -> bool: """ Determine whether the connected server is at least the submitted version @@ -58,11 +64,12 @@ class AsyncClient: """ return self.client.min_version(version_str) - def close(self): + async def close(self): """ Subclass implementation to close the connection to the server/deallocate the client """ self.client.close() + await asyncio.to_thread(self.executor.shutdown, True) async def query(self, query: Optional[str] = None, @@ -78,7 +85,8 @@ class AsyncClient: context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> QueryResult: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QueryResult: """ Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. For parameters, see the create_query_context method. @@ -90,7 +98,7 @@ class AsyncClient: column_formats=column_formats, encoding=encoding, use_none=use_none, column_oriented=column_oriented, use_numpy=use_numpy, max_str_len=max_str_len, context=context, query_tz=query_tz, column_tzs=column_tzs, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query) @@ -107,7 +115,9 @@ class AsyncClient: context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None, + ) -> StreamContext: """ Variation of main query method that returns a stream of column oriented blocks. For parameters, see the create_query_context method. @@ -119,7 +129,7 @@ class AsyncClient: query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, context=context, query_tz=query_tz, column_tzs=column_tzs, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_column_block_stream) @@ -136,7 +146,8 @@ class AsyncClient: context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of row oriented blocks. For parameters, see the create_query_context method. @@ -148,7 +159,7 @@ class AsyncClient: query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, context=context, query_tz=query_tz, column_tzs=column_tzs, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_row_block_stream) @@ -165,7 +176,8 @@ class AsyncClient: context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of row oriented blocks. For parameters, see the create_query_context method. @@ -177,7 +189,7 @@ class AsyncClient: query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, context=context, query_tz=query_tz, column_tzs=column_tzs, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_rows_stream) @@ -189,7 +201,8 @@ class AsyncClient: settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> bytes: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> bytes: """ Query method that simply returns the raw ClickHouse format bytes. :param query: Query statement/format string @@ -199,12 +212,14 @@ class AsyncClient: :param use_database Send the database parameter to ClickHouse so the command will be executed in the client database context :param external_data External data to send with the query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: bytes representing raw ClickHouse return value based on format """ def _raw_query(): return self.client.raw_query(query=query, parameters=parameters, settings=settings, fmt=fmt, - use_database=use_database, external_data=external_data) + use_database=use_database, external_data=external_data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _raw_query) @@ -215,7 +230,8 @@ class AsyncClient: settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> io.IOBase: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> io.IOBase: """ Query method that returns the result as an io.IOBase iterator. :param query: Query statement/format string @@ -225,12 +241,13 @@ class AsyncClient: :param use_database Send the database parameter to ClickHouse so the command will be executed in the client database context :param external_data External data to send with the query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: io.IOBase stream/iterator for the result """ def _raw_stream(): return self.client.raw_stream(query=query, parameters=parameters, settings=settings, fmt=fmt, - use_database=use_database, external_data=external_data) + use_database=use_database, external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _raw_stream) @@ -246,7 +263,8 @@ class AsyncClient: use_none: Optional[bool] = None, max_str_len: Optional[int] = None, context: QueryContext = None, - external_data: Optional[ExternalData] = None): + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Query method that returns the results as a numpy array. For parameter values, see the create_query_context method. @@ -257,7 +275,7 @@ class AsyncClient: return self.client.query_np(query=query, parameters=parameters, settings=settings, query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, max_str_len=max_str_len, context=context, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_np) @@ -273,7 +291,8 @@ class AsyncClient: use_none: Optional[bool] = None, max_str_len: Optional[int] = None, context: QueryContext = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a stream of numpy arrays. For parameter values, see the create_query_context method. @@ -284,7 +303,7 @@ class AsyncClient: return self.client.query_np_stream(query=query, parameters=parameters, settings=settings, query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, max_str_len=max_str_len, - context=context, external_data=external_data) + context=context, external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_np_stream) @@ -304,7 +323,8 @@ class AsyncClient: column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, context: QueryContext = None, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None): + use_extended_dtypes: Optional[bool] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Query method that results the results as a pandas dataframe. For parameter values, see the create_query_context method. @@ -316,7 +336,8 @@ class AsyncClient: query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values, query_tz=query_tz, column_tzs=column_tzs, context=context, - external_data=external_data, use_extended_dtypes=use_extended_dtypes) + external_data=external_data, use_extended_dtypes=use_extended_dtypes, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_df) @@ -336,7 +357,8 @@ class AsyncClient: column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, context: QueryContext = None, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None) -> StreamContext: + use_extended_dtypes: Optional[bool] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a StreamContext. For parameter values, see the create_query_context method. @@ -349,7 +371,8 @@ class AsyncClient: encoding=encoding, use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values, query_tz=query_tz, column_tzs=column_tzs, context=context, - external_data=external_data, use_extended_dtypes=use_extended_dtypes) + external_data=external_data, use_extended_dtypes=use_extended_dtypes, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_df_stream) @@ -373,7 +396,8 @@ class AsyncClient: streaming: bool = False, as_pandas: bool = False, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None) -> QueryContext: + use_extended_dtypes: Optional[bool] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QueryContext: """ Creates or updates a reusable QueryContext object :param query: Query statement/format string @@ -403,6 +427,7 @@ class AsyncClient: :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray and StringArray. Defaulted to True for query_df methods + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Reusable QueryContext """ @@ -415,14 +440,16 @@ class AsyncClient: use_na_values=use_na_values, streaming=streaming, as_pandas=as_pandas, external_data=external_data, - use_extended_dtypes=use_extended_dtypes) + use_extended_dtypes=use_extended_dtypes, + transport_settings=transport_settings) async def query_arrow(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, settings: Optional[Dict[str, Any]] = None, use_strings: Optional[bool] = None, - external_data: Optional[ExternalData] = None): + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Query method using the ClickHouse Arrow format to return a PyArrow table :param query: Query statement/format string @@ -430,12 +457,14 @@ class AsyncClient: :param settings: Optional dictionary of ClickHouse settings (key/string values) :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) :param external_data ClickHouse "external data" to send with query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: PyArrow.Table """ def _query_arrow(): return self.client.query_arrow(query=query, parameters=parameters, settings=settings, - use_strings=use_strings, external_data=external_data) + use_strings=use_strings, external_data=external_data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_arrow) @@ -446,7 +475,8 @@ class AsyncClient: parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, settings: Optional[Dict[str, Any]] = None, use_strings: Optional[bool] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a stream of Arrow tables :param query: Query statement/format string @@ -454,12 +484,14 @@ class AsyncClient: :param settings: Optional dictionary of ClickHouse settings (key/string values) :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) :param external_data ClickHouse "external data" to send with query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Generator that yields a PyArrow.Table for per block representing the result set """ def _query_arrow_stream(): return self.client.query_arrow_stream(query=query, parameters=parameters, settings=settings, - use_strings=use_strings, external_data=external_data) + use_strings=use_strings, external_data=external_data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_arrow_stream) @@ -471,7 +503,8 @@ class AsyncClient: data: Union[str, bytes] = None, settings: Dict[str, Any] = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]: """ Client method that returns a single value instead of a result set :param cmd: ClickHouse query/command as a python format string @@ -482,13 +515,15 @@ class AsyncClient: database context. Otherwise, no database will be specified with the command. This is useful for determining the default user database :param external_data ClickHouse "external data" to send with command/query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary if no data returned """ def _command(): return self.client.command(cmd=cmd, parameters=parameters, data=data, settings=settings, - use_database=use_database, external_data=external_data) + use_database=use_database, external_data=external_data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _command) @@ -516,7 +551,8 @@ class AsyncClient: column_type_names: Sequence[str] = None, column_oriented: bool = False, settings: Optional[Dict[str, Any]] = None, - context: InsertContext = None) -> QuerySummary: + context: InsertContext = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments other than data are ignored @@ -533,13 +569,15 @@ class AsyncClient: :param settings: Optional dictionary of ClickHouse settings (key/string values) :param context: Optional reusable insert context to allow repeated inserts into the same table with different data batches + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ def _insert(): return self.client.insert(table=table, data=data, column_names=column_names, database=database, column_types=column_types, column_type_names=column_type_names, - column_oriented=column_oriented, settings=settings, context=context) + column_oriented=column_oriented, settings=settings, context=context, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _insert) @@ -552,7 +590,8 @@ class AsyncClient: column_names: Optional[Sequence[str]] = None, column_types: Sequence[ClickHouseType] = None, column_type_names: Sequence[str] = None, - context: InsertContext = None) -> QuerySummary: + context: InsertContext = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored :param table: ClickHouse table @@ -567,6 +606,7 @@ class AsyncClient: retrieved from the server :param context: Optional reusable insert context to allow repeated inserts into the same table with different data batches + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ @@ -574,7 +614,7 @@ class AsyncClient: return self.client.insert_df(table=table, df=df, database=database, settings=settings, column_names=column_names, column_types=column_types, column_type_names=column_type_names, - context=context) + context=context, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _insert_df) @@ -582,18 +622,21 @@ class AsyncClient: async def insert_arrow(self, table: str, arrow_table, database: str = None, - settings: Optional[Dict] = None) -> QuerySummary: + settings: Optional[Dict] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format :param table: ClickHouse table :param arrow_table: PyArrow Table object :param database: Optional ClickHouse database :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ def _insert_arrow(): - return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, settings=settings) + return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, + settings=settings, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _insert_arrow) @@ -607,7 +650,8 @@ class AsyncClient: column_type_names: Sequence[str] = None, column_oriented: bool = False, settings: Optional[Dict[str, Any]] = None, - data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext: + data: Optional[Sequence[Sequence[Any]]] = None, + transport_settings: Optional[Dict[str, str]] = None) -> InsertContext: """ Builds a reusable insert context to hold state for a duration of an insert :param table: Target table @@ -621,13 +665,15 @@ class AsyncClient: :param column_oriented: If true the data is already "pivoted" in column form :param settings: Optional dictionary of ClickHouse settings (key/string values) :param data: Initial dataset for insert - :return Reusable insert context + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) + :return: Reusable insert context """ def _create_insert_context(): return self.client.create_insert_context(table=table, column_names=column_names, database=database, column_types=column_types, column_type_names=column_type_names, - column_oriented=column_oriented, settings=settings, data=data) + column_oriented=column_oriented, settings=settings, data=data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _create_insert_context) @@ -652,7 +698,8 @@ class AsyncClient: insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, settings: Optional[Dict] = None, fmt: Optional[str] = None, - compression: Optional[str] = None) -> QuerySummary: + compression: Optional[str] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert data already formatted in a bytes object :param table: Table name (whether qualified with the database name or not) @@ -660,13 +707,21 @@ class AsyncClient: :param insert_block: Binary or string data already in a recognized ClickHouse format :param settings: Optional dictionary of ClickHouse settings (key/string values) :param compression: Recognized ClickHouse `Accept-Encoding` header compression value + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :param fmt: Valid clickhouse format """ def _raw_insert(): return self.client.raw_insert(table=table, column_names=column_names, insert_block=insert_block, - settings=settings, fmt=fmt, compression=compression) + settings=settings, fmt=fmt, compression=compression, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _raw_insert) return result + + async def __aenter__(self) -> "AsyncClient": + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self.close() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/binding.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/binding.py index 24522b08801..1fa2dd09a71 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/binding.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/binding.py @@ -40,8 +40,7 @@ def quote_identifier(identifier: str): def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], server_tz: Optional[tzinfo] = None) -> str: - while query.endswith(';'): - query = query[:-1] + query = query.rstrip(";") if not parameters: return query if hasattr(parameters, 'items'): @@ -52,8 +51,7 @@ def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, An # pylint: disable=too-many-locals,too-many-branches def bind_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], server_tz: Optional[tzinfo] = None) -> Tuple[str, Dict[str, str]]: - while query.endswith(';'): - query = query[:-1] + query = query.rstrip(";") if not parameters: return query, {} diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py index 144f7b8913a..86556f46c77 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py @@ -19,6 +19,7 @@ from clickhouse_connect.driver.constants import CH_VERSION_WITH_PROTOCOL, PROTOC from clickhouse_connect.driver.exceptions import ProgrammingError, OperationalError from clickhouse_connect.driver.external import ExternalData from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.options import check_arrow, check_pandas, check_numpy from clickhouse_connect.driver.summary import QuerySummary from clickhouse_connect.driver.models import ColumnDef, SettingDef, SettingStatus from clickhouse_connect.driver.query import QueryResult, to_arrow, to_arrow_batches, QueryContext, arrow_buffer @@ -68,7 +69,7 @@ class Client(ABC): self.uri = uri self._init_common_settings(apply_server_timezone) - def _init_common_settings(self, apply_server_timezone:Optional[Union[str, bool]] ): + def _init_common_settings(self, apply_server_timezone: Optional[Union[str, bool]]): self.server_tz, dst_safe = pytz.UTC, True self.server_version, server_tz = \ tuple(self.command('SELECT version(), timezone()', use_database=False)) @@ -102,12 +103,11 @@ class Client(ABC): if self._setting_status('date_time_input_format').is_writable: self.set_client_setting('date_time_input_format', 'best_effort') if self._setting_status('allow_experimental_json_type').is_set and \ - self._setting_status('cast_string_to_dynamic_user_inference').is_writable: + self._setting_status('cast_string_to_dynamic_use_inference').is_writable: self.set_client_setting('cast_string_to_dynamic_use_inference', '1') if self.min_version('24.8') and not self.min_version('24.10'): dynamic_module.json_serialization_format = 0 - def _validate_settings(self, settings: Optional[Dict[str, Any]]) -> Dict[str, str]: """ This strips any ClickHouse settings that are not recognized or are read only. @@ -123,8 +123,16 @@ class Client(ABC): return validated def _validate_setting(self, key: str, value: Any, invalid_action: str) -> Optional[str]: + str_value = str(value) + if value is True: + str_value = '1' + elif value is False: + str_value = '0' if key not in self.valid_transport_settings: setting_def = self.server_settings.get(key) + current_setting = self.get_client_setting(key) + if setting_def and setting_def.value == str_value and (current_setting is None or current_setting == setting_def.value): + return None # don't send settings that are already the expected value if setting_def is None or setting_def.readonly: if key in self.optional_transport_settings: return None @@ -135,9 +143,7 @@ class Client(ABC): return None else: raise ProgrammingError(f'Setting {key} is unknown or readonly') from None - if isinstance(value, bool): - return '1' if value else '0' - return str(value) + return str_value def _setting_status(self, key: str) -> SettingStatus: comp_setting = self.server_settings.get(key) @@ -184,6 +190,13 @@ class Client(ABC): :return: The string value of the setting, if it exists, or None """ + @abstractmethod + def set_access_token(self, access_token: str): + """ + Set the ClickHouse access token for the client + :param access_token: Access token string + """ + # pylint: disable=unused-argument,too-many-locals def query(self, query: Optional[str] = None, @@ -199,7 +212,8 @@ class Client(ABC): context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> QueryResult: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QueryResult: """ Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. For parameters, see the create_query_context method @@ -215,7 +229,8 @@ class Client(ABC): response = self.command(query, parameters=query_context.parameters, settings=query_context.settings, - external_data=query_context.external_data) + external_data=query_context.external_data, + transport_settings=query_context.transport_settings) if isinstance(response, QuerySummary): return response.as_query_result() return QueryResult([response] if isinstance(response, list) else [[response]]) @@ -232,7 +247,8 @@ class Client(ABC): context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of column oriented blocks. For parameters, see the create_query_context method. @@ -251,7 +267,8 @@ class Client(ABC): context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of row oriented blocks. For parameters, see the create_query_context method. @@ -270,7 +287,8 @@ class Client(ABC): context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of row oriented blocks. For parameters, see the create_query_context method. @@ -284,16 +302,18 @@ class Client(ABC): settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> bytes: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> bytes: """ Query method that simply returns the raw ClickHouse format bytes :param query: Query statement/format string :param parameters: Optional dictionary used to format the query :param settings: Optional dictionary of ClickHouse settings (key/string values) :param fmt: ClickHouse output format - :param use_database Send the database parameter to ClickHouse so the command will be executed in the client + :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client database context. - :param external_data External data to send with the query + :param external_data: External data to send with the query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: bytes representing raw ClickHouse return value based on format """ @@ -303,7 +323,8 @@ class Client(ABC): settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> io.IOBase: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> io.IOBase: """ Query method that returns the result as an io.IOBase iterator :param query: Query statement/format string @@ -312,7 +333,8 @@ class Client(ABC): :param fmt: ClickHouse output format :param use_database Send the database parameter to ClickHouse so the command will be executed in the client database context. - :param external_data External data to send with the query + :param external_data: External data to send with the query. + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: io.IOBase stream/iterator for the result """ @@ -327,12 +349,14 @@ class Client(ABC): use_none: Optional[bool] = None, max_str_len: Optional[int] = None, context: QueryContext = None, - external_data: Optional[ExternalData] = None): + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Query method that returns the results as a numpy array. For parameter values, see the create_query_context method :return: Numpy array representing the result set """ + check_numpy() return self._context_query(locals(), use_numpy=True).np_result # pylint: disable=duplicate-code,too-many-arguments,unused-argument @@ -346,12 +370,14 @@ class Client(ABC): use_none: Optional[bool] = None, max_str_len: Optional[int] = None, context: QueryContext = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a stream of numpy arrays. For parameter values, see the create_query_context method :return: Generator that yield a numpy array per block representing the result set """ + check_numpy() return self._context_query(locals(), use_numpy=True, streaming=True).np_stream # pylint: disable=duplicate-code,unused-argument @@ -369,12 +395,14 @@ class Client(ABC): column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, context: QueryContext = None, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None): + use_extended_dtypes: Optional[bool] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Query method that results the results as a pandas dataframe. For parameter values, see the create_query_context method :return: Pandas dataframe representing the result set """ + check_pandas() return self._context_query(locals(), use_numpy=True, as_pandas=True).df_result # pylint: disable=duplicate-code,unused-argument @@ -392,12 +420,14 @@ class Client(ABC): column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, context: QueryContext = None, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None) -> StreamContext: + use_extended_dtypes: Optional[bool] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a StreamContext. For parameter values, see the create_query_context method :return: Generator that yields a Pandas dataframe per block representing the result set """ + check_pandas() return self._context_query(locals(), use_numpy=True, as_pandas=True, streaming=True).df_stream @@ -420,7 +450,8 @@ class Client(ABC): streaming: bool = False, as_pandas: bool = False, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None) -> QueryContext: + use_extended_dtypes: Optional[bool] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QueryContext: """ Creates or updates a reusable QueryContext object :param query: Query statement/format string @@ -438,10 +469,10 @@ class Client(ABC): structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for String columns will always be object arrays :param context: An existing QueryContext to be updated with any provided parameter values - :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects). + :param query_tz: Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects). Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime objects with the selected timezone. - :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to + :param column_tzs: A dictionary of column names to tzinfo objects (or strings that will be converted to tzinfo objects). The timezone will be applied to datetime objects returned in the query :param use_na_values: Deprecated alias for use_advanced_dtypes :param as_pandas Return the result columns as pandas.Series objects @@ -450,6 +481,7 @@ class Client(ABC): :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray and StringArray. Defaulted to True for query_df methods + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Reusable QueryContext """ if context: @@ -469,7 +501,8 @@ class Client(ABC): as_pandas=as_pandas, use_extended_dtypes=use_extended_dtypes, streaming=streaming, - external_data=external_data) + external_data=external_data, + transport_settings=transport_settings) if use_numpy and max_str_len is None: max_str_len = 0 if use_extended_dtypes is None: @@ -493,51 +526,60 @@ class Client(ABC): as_pandas=as_pandas, streaming=streaming, apply_server_tz=self.apply_server_timezone, - external_data=external_data) + external_data=external_data, + transport_settings=transport_settings) def query_arrow(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, settings: Optional[Dict[str, Any]] = None, use_strings: Optional[bool] = None, - external_data: Optional[ExternalData] = None): + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Query method using the ClickHouse Arrow format to return a PyArrow table :param query: Query statement/format string :param parameters: Optional dictionary used to format the query :param settings: Optional dictionary of ClickHouse settings (key/string values) - :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) - :param external_data ClickHouse "external data" to send with query + :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) + :param external_data: ClickHouse "external data" to send with query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: PyArrow.Table """ + check_arrow() settings = self._update_arrow_settings(settings, use_strings) return to_arrow(self.raw_query(query, parameters, settings, fmt='Arrow', - external_data=external_data)) + external_data=external_data, + transport_settings=transport_settings)) def query_arrow_stream(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, settings: Optional[Dict[str, Any]] = None, use_strings: Optional[bool] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a stream of Arrow tables :param query: Query statement/format string :param parameters: Optional dictionary used to format the query :param settings: Optional dictionary of ClickHouse settings (key/string values) - :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) - :param external_data ClickHouse "external data" to send with query + :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) + :param external_data: ClickHouse "external data" to send with query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Generator that yields a PyArrow.Table for per block representing the result set """ + check_arrow() settings = self._update_arrow_settings(settings, use_strings) return to_arrow_batches(self.raw_stream(query, parameters, settings, fmt='ArrowStream', - external_data=external_data)) + external_data=external_data, + transport_settings=transport_settings)) def _update_arrow_settings(self, settings: Optional[Dict[str, Any]], @@ -562,7 +604,8 @@ class Client(ABC): data: Union[str, bytes] = None, settings: Dict[str, Any] = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]: """ Client method that returns a single value instead of a result set :param cmd: ClickHouse query/command as a python format string @@ -570,9 +613,10 @@ class Client(ABC): :param data: Optional 'data' for the command (for INSERT INTO in particular) :param settings: Optional dictionary of ClickHouse settings (key/string values) :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client - database context. Otherwise, no database will be specified with the command. This is useful for determining + database context. Otherwise, no database will be specified with the command. This is useful for determining the default user database - :param external_data ClickHouse "external data" to send with command/query + :param external_data: ClickHouse "external data" to send with command/query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary if no data returned """ @@ -593,7 +637,8 @@ class Client(ABC): column_type_names: Sequence[str] = None, column_oriented: bool = False, settings: Optional[Dict[str, Any]] = None, - context: InsertContext = None) -> QuerySummary: + context: InsertContext = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments other than data are ignored @@ -610,6 +655,7 @@ class Client(ABC): :param settings: Optional dictionary of ClickHouse settings (key/string values) :param context: Optional reusable insert context to allow repeated inserts into the same table with different data batches + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ if (context is None or context.empty) and data is None: @@ -621,7 +667,8 @@ class Client(ABC): column_types, column_type_names, column_oriented, - settings) + settings, + transport_settings=transport_settings) if data is not None: if not context.empty: raise ProgrammingError('Attempting to insert new data with non-empty insert context') from None @@ -635,7 +682,8 @@ class Client(ABC): column_names: Optional[Sequence[str]] = None, column_types: Sequence[ClickHouseType] = None, column_type_names: Sequence[str] = None, - context: InsertContext = None) -> QuerySummary: + context: InsertContext = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored :param table: ClickHouse table @@ -650,8 +698,10 @@ class Client(ABC): retrieved from the server :param context: Optional reusable insert context to allow repeated inserts into the same table with different data batches + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ + check_pandas() if context is None: if column_names is None: column_names = df.columns @@ -663,24 +713,28 @@ class Client(ABC): database, column_types=column_types, column_type_names=column_type_names, - settings=settings, context=context) + settings=settings, + transport_settings=transport_settings, + context=context) def insert_arrow(self, table: str, arrow_table, database: str = None, - settings: Optional[Dict] = None) -> QuerySummary: + settings: Optional[Dict] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format :param table: ClickHouse table :param arrow_table: PyArrow Table object :param database: Optional ClickHouse database :param settings: Optional dictionary of ClickHouse settings (key/string values) - :return: QuerySummary with summary information, throws exception if insert fails + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) """ + check_arrow() full_table = table if '.' in table or not database else f'{database}.{table}' compression = self.write_compression if self.write_compression in ('zstd', 'lz4') else None column_names, insert_block = arrow_buffer(arrow_table, compression) - return self.raw_insert(full_table, column_names, insert_block, settings, 'Arrow') + return self.raw_insert(full_table, column_names, insert_block, settings, 'Arrow', transport_settings) def create_insert_context(self, table: str, @@ -690,7 +744,8 @@ class Client(ABC): column_type_names: Sequence[str] = None, column_oriented: bool = False, settings: Optional[Dict[str, Any]] = None, - data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext: + data: Optional[Sequence[Sequence[Any]]] = None, + transport_settings: Optional[Dict[str, str]] = None) -> InsertContext: """ Builds a reusable insert context to hold state for a duration of an insert :param table: Target table @@ -704,7 +759,8 @@ class Client(ABC): :param column_oriented: If true the data is already "pivoted" in column form :param settings: Optional dictionary of ClickHouse settings (key/string values) :param data: Initial dataset for insert - :return Reusable insert context + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) + :return: Reusable insert context """ full_table = table if '.' not in table: @@ -740,6 +796,7 @@ class Client(ABC): column_types, column_oriented=column_oriented, settings=settings, + transport_settings=transport_settings, data=data) def min_version(self, version_str: str) -> bool: @@ -780,15 +837,17 @@ class Client(ABC): insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, settings: Optional[Dict] = None, fmt: Optional[str] = None, - compression: Optional[str] = None) -> QuerySummary: + compression: Optional[str] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert data already formatted in a bytes object :param table: Table name (whether qualified with the database name or not) :param column_names: Sequence of column names :param insert_block: Binary or string data already in a recognized ClickHouse format :param settings: Optional dictionary of ClickHouse settings (key/string values) - :param compression: Recognized ClickHouse `Accept-Encoding` header compression value :param fmt: Valid clickhouse format + :param compression: Recognized ClickHouse `Accept-Encoding` header compression value + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) """ @abstractmethod diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py index 00125a0bdda..0da870bcb43 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py @@ -16,7 +16,8 @@ class BaseQueryContext: column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, encoding: Optional[str] = None, use_extended_dtypes: bool = False, - use_numpy: bool = False): + use_numpy: bool = False, + transport_settings: Optional[Dict[str, str]] = None): self.settings = settings or {} if query_formats is None: self.type_formats = _empty_map @@ -36,6 +37,7 @@ class BaseQueryContext: for type_name, fmt in fmt.items()} self.query_formats = query_formats or {} self.column_formats = column_formats or {} + self.transport_settings = transport_settings self.column_name = None self.encoding = encoding self.use_numpy = use_numpy diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py index 793ca3f953d..c055c639675 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py @@ -54,6 +54,7 @@ class HttpClient(Client): username: str, password: str, database: str, + access_token: Optional[str] = None, compress: Union[bool, str] = True, query_limit: int = 0, query_retries: int = 2, @@ -73,12 +74,16 @@ class HttpClient(Client): apply_server_timezone: Optional[Union[str, bool]] = None, show_clickhouse_errors: Optional[bool] = None, autogenerate_session_id: Optional[bool] = None, - tls_mode: Optional[str] = None): + tls_mode: Optional[str] = None, + proxy_path: str = ''): """ Create an HTTP ClickHouse Connect client See clickhouse_connect.get_client for parameters """ - self.url = f'{interface}://{host}:{port}' + proxy_path = proxy_path.lstrip('/') + if proxy_path: + proxy_path = '/' + proxy_path + self.url = f'{interface}://{host}:{port}{proxy_path}' self.headers = {} self.params = dict_copy(HttpClient.params) ch_settings = dict_copy(settings, self.params) @@ -115,8 +120,11 @@ class HttpClient(Client): else: self.http = default_pool_manager() - if (not client_cert or tls_mode in ('strict', 'proxy')) and username: + if access_token: + self.headers['Authorization'] = f'Bearer {access_token}' + elif (not client_cert or tls_mode in ('strict', 'proxy')) and username: self.headers['Authorization'] = 'Basic ' + b64encode(f'{username}:{password}'.encode()).decode() + self.headers['User-Agent'] = common.build_client_name(client_name) self._read_format = self._write_format = 'Native' self._transform = NativeTransform() @@ -180,6 +188,12 @@ class HttpClient(Client): def get_client_setting(self, key) -> Optional[str]: return self.params.get(key) + def set_access_token(self, access_token: str): + auth_header = self.headers.get('Authorization') + if auth_header and not auth_header.startswith('Bearer'): + raise ProgrammingError('Cannot set access token when a different auth type is used') + self.headers['Authorization'] = f'Bearer {access_token}' + def _prep_query(self, context: QueryContext): final_query = super()._prep_query(context) if context.is_insert: @@ -228,7 +242,7 @@ class HttpClient(Client): headers['Content-Type'] = 'text/plain; charset=utf-8' response = self._raw_request(body, params, - headers, + dict_copy(headers, context.transport_settings), stream=True, retries=self.query_retries, fields=fields, @@ -266,7 +280,7 @@ class HttpClient(Client): if self.database: params['database'] = self.database params.update(self._validate_settings(context.settings)) - + headers = dict_copy(headers, context.transport_settings) response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False) logger.debug('Context insert response code: %d, content: %s', response.status, response.data) context.data = None @@ -277,7 +291,8 @@ class HttpClient(Client): insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, settings: Optional[Dict] = None, fmt: Optional[str] = None, - compression: Optional[str] = None) -> QuerySummary: + compression: Optional[str] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ See BaseClient doc_string for this method """ @@ -297,6 +312,7 @@ class HttpClient(Client): if self.database: params['database'] = self.database params.update(self._validate_settings(settings or {})) + headers = dict_copy(headers, transport_settings) response = self._raw_request(insert_block, params, headers, server_wait=False) logger.debug('Raw insert response code: %d, content: %s', response.status, response.data) return QuerySummary(self._summary(response)) @@ -318,7 +334,8 @@ class HttpClient(Client): data: Union[str, bytes] = None, settings: Optional[Dict] = None, use_database: int = True, - external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]: """ See BaseClient doc_string for this method """ @@ -346,7 +363,7 @@ class HttpClient(Client): if use_database and self.database: params['database'] = self.database params.update(self._validate_settings(settings or {})) - + headers = dict_copy(headers, transport_settings) method = 'POST' if payload or fields else 'GET' response = self._raw_request(payload, params, headers, method, fields=fields, server_wait=False) if response.data: @@ -398,16 +415,18 @@ class HttpClient(Client): data = data.encode() headers = dict_copy(self.headers, headers) attempts = 0 + final_params = {} if server_wait: - params['wait_end_of_query'] = '1' + final_params['wait_end_of_query'] = '1' # We can't actually read the progress headers, but we enable them so ClickHouse sends something # to keep the connection alive when waiting for long-running queries and (2) to get summary information # if not streaming if self._send_progress: - params['send_progress_in_http_headers'] = '1' + final_params['send_progress_in_http_headers'] = '1' if self._progress_interval: - params['http_headers_progress_interval_ms'] = self._progress_interval - final_params = dict_copy(self.params, params) + final_params['http_headers_progress_interval_ms'] = self._progress_interval + final_params = dict_copy(self.params, final_params) + final_params = dict_copy(final_params, params) url = f'{self.url}?{urlencode(final_params)}' kwargs = { 'headers': headers, @@ -466,24 +485,27 @@ class HttpClient(Client): settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> bytes: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> bytes: """ See BaseClient doc_string for this method """ body, params, fields = self._prep_raw_query(query, parameters, settings, fmt, use_database, external_data) - return self._raw_request(body, params, fields=fields).data + return self._raw_request(body, params, fields=fields, headers=transport_settings).data def raw_stream(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> io.IOBase: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> io.IOBase: """ See BaseClient doc_string for this method """ body, params, fields = self._prep_raw_query(query, parameters, settings, fmt, use_database, external_data) - return self._raw_request(body, params, fields=fields, stream=True, server_wait=False) + return self._raw_request(body, params, fields=fields, stream=True, server_wait=False, + headers=transport_settings) def _prep_raw_query(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], @@ -515,7 +537,7 @@ class HttpClient(Client): See BaseClient doc_string for this method """ try: - response = self.http.request('GET', f'{self.url}/ping', timeout=3) + response = self.http.request('GET', f'{self.url}/ping', timeout=3, preload_content=True) return 200 <= response.status < 300 except HTTPError: logger.debug('ping failed', exc_info=True) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py index 859c7d88ba8..37bb24e616e 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py @@ -55,10 +55,10 @@ def close_managers(): def get_pool_manager_options(keep_interval: int = DEFAULT_KEEP_INTERVAL, keep_count: int = DEFAULT_KEEP_COUNT, keep_idle: int = DEFAULT_KEEP_IDLE, - ca_cert: str = None, + ca_cert: Optional[str] = None, verify: bool = True, - client_cert: str = None, - client_cert_key: str = None, + client_cert: Optional[str] = None, + client_cert_key: Optional[str] = None, **options) -> Dict[str, Any]: socket_options = core_socket_options.copy() if getattr(socket, 'TCP_KEEPINTVL', None) is not None: @@ -88,12 +88,12 @@ def get_pool_manager_options(keep_interval: int = DEFAULT_KEEP_INTERVAL, def get_pool_manager(keep_interval: int = DEFAULT_KEEP_INTERVAL, keep_count: int = DEFAULT_KEEP_COUNT, keep_idle: int = DEFAULT_KEEP_IDLE, - ca_cert: str = None, + ca_cert: Optional[str] = None, verify: bool = True, - client_cert: str = None, - client_cert_key: str = None, - http_proxy: str = None, - https_proxy: str = None, + client_cert: Optional[str] = None, + client_cert_key: Optional[str] = None, + http_proxy: Optional[str] = None, + https_proxy: Optional[str] = None, **options): options = get_pool_manager_options(keep_interval, keep_count, @@ -228,12 +228,13 @@ class ResponseSource: read_gen = response.stream(chunk_size, decompress is None) while True: while not done: + chunk = None try: chunk = next(read_gen, None) # Always try to read at least one chunk if there are any left except Exception: # pylint: disable=broad-except # By swallowing an unexpected exception reading the stream, we will let consumers decide how to # handle the unexpected end of stream - pass + logger.warning('unexpected failure to read next chunk', exc_info=True) if not chunk: done = True break diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py index a54ae37c565..4cb24e1aaef 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py @@ -42,8 +42,9 @@ class InsertContext(BaseQueryContext): compression: Optional[Union[str, bool]] = None, query_formats: Optional[Dict[str, str]] = None, column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, - block_size: Optional[int] = None): - super().__init__(settings, query_formats, column_formats) + block_size: Optional[int] = None, + transport_settings: Optional[Dict[str, str]] = None): + super().__init__(settings, query_formats, column_formats, transport_settings=transport_settings) self.table = table self.column_names = column_names self.column_types = column_types @@ -148,19 +149,20 @@ class InsertContext(BaseQueryContext): data = [] for df_col_name, col_name, ch_type in zip(df.columns, self.column_names, self.column_types): df_col = df[df_col_name] - d_type = str(df_col.dtype) + d_type_kind = df_col.dtype.kind if ch_type.python_type == int: - if 'float' in d_type: + if d_type_kind == 'f': df_col = df_col.round().astype(ch_type.base_type, copy=False) - else: - df_col = df_col.astype(ch_type.base_type, copy=False) - elif 'datetime' in ch_type.np_type and (pd_time_test(df_col) or 'datetime64[ns' in d_type): + elif d_type_kind in ('i', 'u') and not df_col.hasnans: + data.append(df_col.to_list()) + continue + elif 'datetime' in ch_type.np_type and (pd_time_test(df_col) or 'datetime64[ns' in str(df_col.dtype)): div = ch_type.nano_divisor data.append([None if pd.isnull(x) else x.value // div for x in df_col]) self.column_formats[col_name] = 'int' continue if ch_type.nullable: - if d_type == 'object': + if d_type_kind == 'O': # This is ugly, but the multiple replaces seem required as a result of this bug: # https://github.com/pandas-dev/pandas/issues/29024 df_col = df_col.replace({pd.NaT: None}).replace({np.nan: None}) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py index bd10270e71c..6c764cfaea6 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py @@ -3,7 +3,7 @@ import re import pytz from io import IOBase -from typing import Any, Tuple, Dict, Sequence, Optional, Union, Generator +from typing import Any, Tuple, Dict, Sequence, Optional, Union, Generator, BinaryIO from datetime import tzinfo from pytz.exceptions import UnknownTimeZoneError @@ -52,7 +52,8 @@ class QueryContext(BaseQueryContext): as_pandas: bool = False, streaming: bool = False, apply_server_tz: bool = False, - external_data: Optional[ExternalData] = None): + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Initializes various configuration settings for the query context @@ -85,7 +86,8 @@ class QueryContext(BaseQueryContext): column_formats, encoding, use_extended_dtypes if use_extended_dtypes is not None else False, - use_numpy if use_numpy is not None else False) + use_numpy if use_numpy is not None else False, + transport_settings=transport_settings) self.query = query self.parameters = parameters or {} self.use_none = True if use_none is None else use_none @@ -189,7 +191,8 @@ class QueryContext(BaseQueryContext): use_extended_dtypes: Optional[bool] = None, as_pandas: bool = False, streaming: bool = False, - external_data: Optional[ExternalData] = None) -> 'QueryContext': + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> 'QueryContext': """ Creates Query context copy with parameters overridden/updated as appropriate. """ @@ -210,7 +213,8 @@ class QueryContext(BaseQueryContext): as_pandas, streaming, self.apply_server_tz, - self.external_data if external_data is None else external_data) + self.external_data if external_data is None else external_data, + self.transport_settings if transport_settings is None else transport_settings) def _update_query(self): self.final_query, self.bind_params = bind_query(self.query, self.parameters, self.server_tz) @@ -374,7 +378,7 @@ def to_arrow_batches(buffer: IOBase) -> StreamContext: return StreamContext(buffer, reader) -def arrow_buffer(table, compression: Optional[str] = None) -> Tuple[Sequence[str], bytes]: +def arrow_buffer(table, compression: Optional[str] = None) -> Tuple[Sequence[str], Union[bytes, BinaryIO]]: pyarrow = check_arrow() options = None if compression in ('zstd', 'lz4'): diff --git a/contrib/python/clickhouse-connect/ya.make b/contrib/python/clickhouse-connect/ya.make index 3ff1aff7343..cead3e1f6d9 100644 --- a/contrib/python/clickhouse-connect/ya.make +++ b/contrib/python/clickhouse-connect/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(0.8.10) +VERSION(0.8.18) LICENSE(Apache-2.0) |