diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/python/clickhouse-connect | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/python/clickhouse-connect')
70 files changed, 8305 insertions, 0 deletions
diff --git a/contrib/python/clickhouse-connect/.dist-info/METADATA b/contrib/python/clickhouse-connect/.dist-info/METADATA new file mode 100644 index 0000000000..b078aa4b26 --- /dev/null +++ b/contrib/python/clickhouse-connect/.dist-info/METADATA @@ -0,0 +1,79 @@ +Metadata-Version: 2.1 +Name: clickhouse-connect +Version: 0.6.18 +Summary: ClickHouse Database Core Driver for Python, Pandas, and Superset +Home-page: https://github.com/ClickHouse/clickhouse-connect +Author: ClickHouse Inc. +Author-email: clients@clickhouse.com +License: Apache License 2.0 +Keywords: clickhouse,superset,sqlalchemy,http,driver +Classifier: Development Status :: 4 - Beta +Classifier: Intended Audience :: Developers +Classifier: License :: OSI Approved :: Apache Software License +Classifier: Programming Language :: Python :: 3.7 +Classifier: Programming Language :: Python :: 3.8 +Classifier: Programming Language :: Python :: 3.9 +Classifier: Programming Language :: Python :: 3.10 +Classifier: Programming Language :: Python :: 3.11 +Requires-Python: ~=3.7 +Description-Content-Type: text/markdown +License-File: LICENSE +Requires-Dist: certifi +Requires-Dist: urllib3 >=1.26 +Requires-Dist: pytz +Requires-Dist: zstandard +Requires-Dist: lz4 +Provides-Extra: arrow +Requires-Dist: pyarrow ; extra == 'arrow' +Provides-Extra: numpy +Requires-Dist: numpy ; extra == 'numpy' +Provides-Extra: orjson +Requires-Dist: orjson ; extra == 'orjson' +Provides-Extra: pandas +Requires-Dist: pandas ; extra == 'pandas' +Provides-Extra: sqlalchemy +Requires-Dist: sqlalchemy <2.0,>1.3.21 ; extra == 'sqlalchemy' + +## ClickHouse Connect + +A high performance core database driver for connecting ClickHouse to Python, Pandas, and Superset +* Pandas DataFrames +* Numpy Arrays +* PyArrow Tables +* Superset Connector +* SQLAlchemy 1.3 and 1.4 (limited feature set) + +ClickHouse Connect currently uses the ClickHouse HTTP interface for maximum compatibility. + + +### Installation + +``` +pip install clickhouse-connect +``` + +ClickHouse Connect requires Python 3.7 or higher. + + +### Superset Connectivity +ClickHouse Connect is fully integrated with Apache Superset. Previous versions of ClickHouse Connect utilized a +dynamically loaded Superset Engine Spec, but as of Superset v2.1.0 the engine spec was incorporated into the main +Apache Superset project and removed from clickhouse-connect in v0.6.0. If you have issues connecting to earlier +versions of Superset, please use clickhouse-connect v0.5.25. + +When creating a Superset Data Source, either use the provided connection dialog, or a SqlAlchemy DSN in the form +`clickhousedb://{username}:{password}@{host}:{port}`. + + +### SQLAlchemy Implementation +ClickHouse Connect incorporates a minimal SQLAlchemy implementation (without any ORM features) for compatibility with +Superset. It has only been tested against SQLAlchemy versions 1.3.x and 1.4.x, and is unlikely to work with more +complex SQLAlchemy applications. + + +### Complete Documentation +The documentation for ClickHouse Connect has moved to +[ClickHouse Docs](https://clickhouse.com/docs/en/integrations/language-clients/python/intro) + + + diff --git a/contrib/python/clickhouse-connect/.dist-info/entry_points.txt b/contrib/python/clickhouse-connect/.dist-info/entry_points.txt new file mode 100644 index 0000000000..8b6f77a970 --- /dev/null +++ b/contrib/python/clickhouse-connect/.dist-info/entry_points.txt @@ -0,0 +1,3 @@ +[sqlalchemy.dialects] +clickhousedb = clickhouse_connect.cc_sqlalchemy.dialect:ClickHouseDialect +clickhousedb.connect = clickhouse_connect.cc_sqlalchemy.dialect:ClickHouseDialect diff --git a/contrib/python/clickhouse-connect/.dist-info/top_level.txt b/contrib/python/clickhouse-connect/.dist-info/top_level.txt new file mode 100644 index 0000000000..8d79419fec --- /dev/null +++ b/contrib/python/clickhouse-connect/.dist-info/top_level.txt @@ -0,0 +1 @@ +clickhouse_connect diff --git a/contrib/python/clickhouse-connect/LICENSE b/contrib/python/clickhouse-connect/LICENSE new file mode 100644 index 0000000000..31495aae11 --- /dev/null +++ b/contrib/python/clickhouse-connect/LICENSE @@ -0,0 +1,203 @@ +Copyright 2022-2023 ClickHouse, Inc. + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2022-2023 ClickHouse, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/contrib/python/clickhouse-connect/README.md b/contrib/python/clickhouse-connect/README.md new file mode 100644 index 0000000000..4298c161c9 --- /dev/null +++ b/contrib/python/clickhouse-connect/README.md @@ -0,0 +1,43 @@ +## ClickHouse Connect + +A high performance core database driver for connecting ClickHouse to Python, Pandas, and Superset +* Pandas DataFrames +* Numpy Arrays +* PyArrow Tables +* Superset Connector +* SQLAlchemy 1.3 and 1.4 (limited feature set) + +ClickHouse Connect currently uses the ClickHouse HTTP interface for maximum compatibility. + + +### Installation + +``` +pip install clickhouse-connect +``` + +ClickHouse Connect requires Python 3.7 or higher. + + +### Superset Connectivity +ClickHouse Connect is fully integrated with Apache Superset. Previous versions of ClickHouse Connect utilized a +dynamically loaded Superset Engine Spec, but as of Superset v2.1.0 the engine spec was incorporated into the main +Apache Superset project and removed from clickhouse-connect in v0.6.0. If you have issues connecting to earlier +versions of Superset, please use clickhouse-connect v0.5.25. + +When creating a Superset Data Source, either use the provided connection dialog, or a SqlAlchemy DSN in the form +`clickhousedb://{username}:{password}@{host}:{port}`. + + +### SQLAlchemy Implementation +ClickHouse Connect incorporates a minimal SQLAlchemy implementation (without any ORM features) for compatibility with +Superset. It has only been tested against SQLAlchemy versions 1.3.x and 1.4.x, and is unlikely to work with more +complex SQLAlchemy applications. + + +### Complete Documentation +The documentation for ClickHouse Connect has moved to +[ClickHouse Docs](https://clickhouse.com/docs/en/integrations/language-clients/python/intro) + + + diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py new file mode 100644 index 0000000000..81ee6dd3a0 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py @@ -0,0 +1,6 @@ +from clickhouse_connect.driver import create_client + + +driver_name = 'clickhousedb' + +get_client = create_client diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py new file mode 100644 index 0000000000..15ce74c8e3 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py @@ -0,0 +1 @@ +version = '0.6.18' diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/__init__.py new file mode 100644 index 0000000000..ec80555281 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/__init__.py @@ -0,0 +1,6 @@ +from clickhouse_connect import driver_name +from clickhouse_connect.cc_sqlalchemy.datatypes.base import schema_types + +# pylint: disable=invalid-name +dialect_name = driver_name +ischema_names = schema_types diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/__init__.py new file mode 100644 index 0000000000..f364badd88 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/__init__.py @@ -0,0 +1 @@ +import clickhouse_connect.cc_sqlalchemy.datatypes.sqltypes diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/base.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/base.py new file mode 100644 index 0000000000..14d60351f4 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/base.py @@ -0,0 +1,135 @@ +import logging +from typing import Dict, Type + +from sqlalchemy.exc import CompileError + +from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef, EMPTY_TYPE_DEF +from clickhouse_connect.datatypes.registry import parse_name, type_map +from clickhouse_connect.driver.query import format_query_value + +logger = logging.getLogger(__name__) + + +class ChSqlaType: + """ + A SQLAlchemy TypeEngine that wraps a ClickHouseType. We don't extend TypeEngine directly, instead all concrete + subclasses will inherit from TypeEngine. + """ + ch_type: ClickHouseType = None + generic_type: None + _ch_type_cls = None + _instance = None + _instance_cache: Dict[TypeDef, 'ChSqlaType'] = None + + def __init_subclass__(cls): + """ + Registers ChSqla type in the type map and sets the underlying ClickHouseType class to use to initialize + ChSqlaType instances + """ + base = cls.__name__ + if not cls._ch_type_cls: + try: + cls._ch_type_cls = type_map[base] + except KeyError: + logger.warning('Attempted to register SQLAlchemy type without corresponding ClickHouse Type') + return + schema_types.append(base) + sqla_type_map[base] = cls + cls._instance_cache = {} + + @classmethod + def build(cls, type_def: TypeDef): + """ + Factory function for building a ChSqlaType based on the type definition + :param type_def: -- TypeDef tuple that defines arguments for this instance + :return: Shared instance of a configured ChSqlaType + """ + return cls._instance_cache.setdefault(type_def, cls(type_def=type_def)) + + def __init__(self, type_def: TypeDef = EMPTY_TYPE_DEF): + """ + Basic constructor that does nothing but set the wrapped ClickHouseType. It is overridden in some cases + to add specific SqlAlchemy behavior when constructing subclasses "by hand", in which case the type_def + parameter is normally set to None and other keyword parameters used for construction + :param type_def: TypeDef tuple used to build the underlying ClickHouseType. This is normally populated by the + parse_name function + """ + self.type_def = type_def + self.ch_type = self._ch_type_cls.build(type_def) + + @property + def name(self): + return self.ch_type.name + + @name.setter + def name(self, name): # Keep SQLAlchemy from overriding our ClickHouse name + pass + + @property + def nullable(self): + return self.ch_type.nullable + + @property + def low_card(self): + return self.ch_type.low_card + + @staticmethod + def result_processor(): + """ + Override for the SqlAlchemy TypeEngine result_processor method, which is used to convert row values to the + correct Python type. The core driver handles this automatically, so we always return None. + """ + return None + + @staticmethod + def _cached_result_processor(*_): + """ + Override for the SqlAlchemy TypeEngine _cached_result_processor method to prevent weird behavior + when SQLAlchemy tries to cache. + """ + return None + + @staticmethod + def _cached_literal_processor(*_): + """ + Override for the SqlAlchemy TypeEngine _cached_literal_processor. We delegate to the driver format_query_value + method and should be able to ignore literal_processor definitions in the dialect, which are verbose and + confusing. + """ + return format_query_value + + def _compiler_dispatch(self, _visitor, **_): + """ + Override for the SqlAlchemy TypeEngine _compiler_dispatch method to sidestep unnecessary layers and complexity + when generating the type name. The underlying ClickHouseType generates the correct name + :return: Name generated by the underlying driver. + """ + return self.name + + +class CaseInsensitiveDict(dict): + def __setitem__(self, key, value): + super().__setitem__(key.lower(), value) + + def __getitem__(self, item): + return super().__getitem__(item.lower()) + + +sqla_type_map: Dict[str, Type[ChSqlaType]] = CaseInsensitiveDict() +schema_types = [] + + +def sqla_type_from_name(name: str) -> ChSqlaType: + """ + Factory function to convert a ClickHouse type name to the appropriate ChSqlaType + :param name: Name returned from ClickHouse using Native protocol or WithNames format + :return: ChSqlaType + """ + base, name, type_def = parse_name(name) + try: + type_cls = sqla_type_map[base] + except KeyError: + err_str = f'Unrecognized ClickHouse type base: {base} name: {name}' + logger.error(err_str) + raise CompileError(err_str) from KeyError + return type_cls.build(type_def) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py new file mode 100644 index 0000000000..d611f18912 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py @@ -0,0 +1,405 @@ +import pytz +from enum import Enum as PyEnum +from typing import Type, Union, Sequence + +from sqlalchemy.types import Integer, Float, Numeric, Boolean as SqlaBoolean, \ + UserDefinedType, String as SqlaString, DateTime as SqlaDateTime, Date as SqlaDate +from sqlalchemy.exc import ArgumentError + +from clickhouse_connect.cc_sqlalchemy.datatypes.base import ChSqlaType +from clickhouse_connect.datatypes.base import TypeDef, NULLABLE_TYPE_DEF, LC_TYPE_DEF, EMPTY_TYPE_DEF +from clickhouse_connect.datatypes.numeric import Enum8 as ChEnum8, Enum16 as ChEnum16 +from clickhouse_connect.driver.common import decimal_prec + + +class Int8(ChSqlaType, Integer): + pass + + +class UInt8(ChSqlaType, Integer): + pass + + +class Int16(ChSqlaType, Integer): + pass + + +class UInt16(ChSqlaType, Integer): + pass + + +class Int32(ChSqlaType, Integer): + pass + + +class UInt32(ChSqlaType, Integer): + pass + + +class Int64(ChSqlaType, Integer): + pass + + +class UInt64(ChSqlaType, Integer): + pass + + +class Int128(ChSqlaType, Integer): + pass + + +class UInt128(ChSqlaType, Integer): + pass + + +class Int256(ChSqlaType, Integer): + pass + + +class UInt256(ChSqlaType, Integer): + pass + + +class Float32(ChSqlaType, Float): + def __init__(self, type_def: TypeDef = EMPTY_TYPE_DEF): + ChSqlaType.__init__(self, type_def) + Float.__init__(self) + + +class Float64(ChSqlaType, Float): + def __init__(self, type_def: TypeDef = EMPTY_TYPE_DEF): + ChSqlaType.__init__(self, type_def) + Float.__init__(self) + + +class Bool(ChSqlaType, SqlaBoolean): + def __init__(self, type_def: TypeDef = EMPTY_TYPE_DEF): + ChSqlaType.__init__(self, type_def) + SqlaBoolean.__init__(self) + + +class Boolean(Bool): + pass + + +class Decimal(ChSqlaType, Numeric): + dec_size = 0 + + def __init__(self, precision: int = 0, scale: int = 0, type_def: TypeDef = None): + """ + Construct either with precision and scale (for DDL), or a TypeDef with those values (by name) + :param precision: Number of digits the Decimal + :param scale: Digits after the decimal point + :param type_def: Parsed type def from ClickHouse arguments + """ + if type_def: + if self.dec_size: + precision = decimal_prec[self.dec_size] + scale = type_def.values[0] + else: + precision, scale = type_def.values + elif not precision or scale < 0 or scale > precision: + raise ArgumentError('Invalid precision or scale for ClickHouse Decimal type') + else: + type_def = TypeDef(values=(precision, scale)) + ChSqlaType.__init__(self, type_def) + Numeric.__init__(self, precision, scale) + + +# pylint: disable=duplicate-code +class Decimal32(Decimal): + dec_size = 32 + + +class Decimal64(Decimal): + dec_size = 64 + + +class Decimal128(Decimal): + dec_size = 128 + + +class Decimal256(Decimal): + dec_size = 256 + + +class Enum(ChSqlaType, UserDefinedType): + _size = 16 + python_type = str + + def __init__(self, enum: Type[PyEnum] = None, keys: Sequence[str] = None, values: Sequence[int] = None, + type_def: TypeDef = None): + """ + Construct a ClickHouse enum either from a Python Enum or parallel lists of keys and value. Note that + Python enums do not support empty strings as keys, so the alternate keys/values must be used in that case + :param enum: Python enum to convert + :param keys: List of string keys + :param values: List of integer values + :param type_def: TypeDef from parse_name function + """ + if not type_def: + if enum: + keys = [e.name for e in enum] + values = [e.value for e in enum] + self._validate(keys, values) + if self.__class__.__name__ == 'Enum': + if max(values) <= 127 and min(values) >= -128: + self._ch_type_cls = ChEnum8 + else: + self._ch_type_cls = ChEnum16 + type_def = TypeDef(keys=tuple(keys), values=tuple(values)) + super().__init__(type_def) + + @classmethod + def _validate(cls, keys: Sequence, values: Sequence): + bad_key = next((x for x in keys if not isinstance(x, str)), None) + if bad_key: + raise ArgumentError(f'ClickHouse enum key {bad_key} is not a string') + bad_value = next((x for x in values if not isinstance(x, int)), None) + if bad_value: + raise ArgumentError(f'ClickHouse enum value {bad_value} is not an integer') + value_min = -(2 ** (cls._size - 1)) + value_max = 2 ** (cls._size - 1) - 1 + bad_value = next((x for x in values if x < value_min or x > value_max), None) + if bad_value: + raise ArgumentError(f'Clickhouse enum value {bad_value} is out of range') + + +class Enum8(Enum): + _size = 8 + _ch_type_cls = ChEnum8 + + +class Enum16(Enum): + _ch_type_cls = ChEnum16 + + +class String(ChSqlaType, UserDefinedType): + python_type = str + + +class FixedString(ChSqlaType, SqlaString): + def __init__(self, size: int = -1, type_def: TypeDef = None): + if not type_def: + type_def = TypeDef(values=(size,)) + ChSqlaType.__init__(self, type_def) + SqlaString.__init__(self, size) + + +class IPv4(ChSqlaType, UserDefinedType): + python_type = None + + +class IPv6(ChSqlaType, UserDefinedType): + python_type = None + + +class UUID(ChSqlaType, UserDefinedType): + python_type = None + + +class Date(ChSqlaType, SqlaDate): + pass + + +class Date32(ChSqlaType, SqlaDate): + pass + + +class DateTime(ChSqlaType, SqlaDateTime): + def __init__(self, tz: str = None, type_def: TypeDef = None): + """ + Date time constructor with optional ClickHouse timezone parameter if not constructed with TypeDef + :param tz: Timezone string as defined in pytz + :param type_def: TypeDef from parse_name function + """ + if not type_def: + if tz: + pytz.timezone(tz) + type_def = TypeDef(values=(f"'{tz}'",)) + else: + type_def = EMPTY_TYPE_DEF + ChSqlaType.__init__(self, type_def) + SqlaDateTime.__init__(self) + + +class DateTime64(ChSqlaType, SqlaDateTime): + def __init__(self, precision: int = None, tz: str = None, type_def: TypeDef = None): + """ + Date time constructor with precision and timezone parameters if not constructed with TypeDef + :param precision: Usually 3/6/9 for mill/micro/nansecond precision on ClickHouse side + :param tz: Timezone string as defined in pytz + :param type_def: TypeDef from parse_name function + """ + if not type_def: + if tz: + pytz.timezone(tz) + type_def = TypeDef(values=(precision, f"'{tz}'")) + else: + type_def = TypeDef(values=(precision,)) + prec = type_def.values[0] if len(type_def.values) else None + if not isinstance(prec, int) or prec < 0 or prec > 9: + raise ArgumentError(f'Invalid precision value {prec} for ClickHouse DateTime64') + ChSqlaType.__init__(self, type_def) + SqlaDateTime.__init__(self) + + +class Nullable: + """ + Class "wrapper" to use in DDL construction. It is never actually initialized but instead creates the "wrapped" + type with a Nullable wrapper + """ + + def __new__(cls, element: Union[ChSqlaType, Type[ChSqlaType]]): + """ + Actually returns an instance of the enclosed type with a Nullable wrapper. If element is an instance, + constructs a new instance with a copied TypeDef plus the Nullable wrapper. If element is just a type, + constructs a new element of that type with only the Nullable wrapper. + :param element: ChSqlaType instance or class to wrap + """ + if callable(element): + return element(type_def=NULLABLE_TYPE_DEF) + if element.low_card: + raise ArgumentError('Low Cardinality type cannot be Nullable') + orig = element.type_def + wrappers = orig if 'Nullable' in orig.wrappers else orig.wrappers + ('Nullable',) + return element.__class__(type_def=TypeDef(wrappers, orig.keys, orig.values)) + + +class LowCardinality: + """ + Class "wrapper" to use in DDL construction. It is never actually instantiated but instead creates the "wrapped" + type with a LowCardinality wrapper + """ + + def __new__(cls, element: Union[ChSqlaType, Type[ChSqlaType]]): + """ + Actually returns an instance of the enclosed type with a LowCardinality wrapper. If element is an instance, + constructs a new instance with a copied TypeDef plus the LowCardinality wrapper. If element is just a type, + constructs a new element of that type with only the LowCardinality wrapper. + :param element: ChSqlaType instance or class to wrap + """ + if callable(element): + return element(type_def=LC_TYPE_DEF) + orig = element.type_def + wrappers = orig if 'LowCardinality' in orig.wrappers else ('LowCardinality',) + orig.wrappers + return element.__class__(type_def=TypeDef(wrappers, orig.keys, orig.values)) + + +class Array(ChSqlaType, UserDefinedType): + python_type = list + + def __init__(self, element: Union[ChSqlaType, Type[ChSqlaType]] = None, type_def: TypeDef = None): + """ + Array constructor that can take a wrapped Array type if not constructed from a TypeDef + :param element: ChSqlaType instance or class to wrap + :param type_def: TypeDef from parse_name function + """ + if not type_def: + if callable(element): + element = element() + type_def = TypeDef(values=(element.name,)) + super().__init__(type_def) + + +class Map(ChSqlaType, UserDefinedType): + python_type = dict + + def __init__(self, key_type: Union[ChSqlaType, Type[ChSqlaType]] = None, + value_type: Union[ChSqlaType, Type[ChSqlaType]] = None, type_def: TypeDef = None): + """ + Map constructor that can take a wrapped key/values types if not constructed from a TypeDef + :param key_type: ChSqlaType instance or class to use as keys for the Map + :param value_type: ChSqlaType instance or class to use as values for the Map + :param type_def: TypeDef from parse_name function + """ + if not type_def: + if callable(key_type): + key_type = key_type() + if callable(value_type): + value_type = value_type() + type_def = TypeDef(values=(key_type.name, value_type.name)) + super().__init__(type_def) + + +class Tuple(ChSqlaType, UserDefinedType): + python_type = tuple + + def __init__(self, elements: Sequence[Union[ChSqlaType, Type[ChSqlaType]]] = None, type_def: TypeDef = None): + """ + Tuple constructor that can take a list of element types if not constructed from a TypeDef + :param elements: sequence of ChSqlaType instance or class to use as tuple element types + :param type_def: TypeDef from parse_name function + """ + if not type_def: + values = [et() if callable(et) else et for et in elements] + type_def = TypeDef(values=tuple(v.name for v in values)) + super().__init__(type_def) + + +class JSON(ChSqlaType, UserDefinedType): + """ + Note this isn't currently supported for insert/select, only table definitions + """ + python_type = None + + +class Nested(ChSqlaType, UserDefinedType): + """ + Note this isn't currently supported for insert/select, only table definitions + """ + python_type = None + + +class Object(ChSqlaType, UserDefinedType): + """ + Note this isn't currently supported for insert/select, only table definitions + """ + python_type = None + + def __init__(self, fmt: str = None, type_def: TypeDef = None): + if not type_def: + type_def = TypeDef(values=(fmt,)) + super().__init__(type_def) + + +class SimpleAggregateFunction(ChSqlaType, UserDefinedType): + python_type = None + + def __init__(self, name: str = None, element: Union[ChSqlaType, Type[ChSqlaType]] = None, type_def: TypeDef = None): + """ + Constructor that can take the SimpleAggregateFunction name and wrapped type if not constructed from a TypeDef + :param name: Aggregate function name + :param element: ChSqlaType instance or class which the function aggregates + :param type_def: TypeDef from parse_name function + """ + if not type_def: + if callable(element): + element = element() + type_def = TypeDef(values=(name, element.name,)) + super().__init__(type_def) + + +class AggregateFunction(ChSqlaType, UserDefinedType): + """ + Note this isn't currently supported for insert/select, only table definitions + """ + python_type = None + + def __init__(self, *params, type_def: TypeDef = None): + """ + Simply wraps the parameters for AggregateFunction for DDL, unless the TypeDef is specified. + Callables or actual types are converted to their names. + :param params: AggregateFunction parameters + :param type_def: TypeDef from parse_name function + """ + if not type_def: + values = () + for x in params: + if callable(x): + x = x() + if isinstance(x, ChSqlaType): + x = x.name + values += (x,) + type_def = TypeDef(values=values) + super().__init__(type_def) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/__init__.py diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/custom.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/custom.py new file mode 100644 index 0000000000..b7eee4ad7d --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/custom.py @@ -0,0 +1,40 @@ +from sqlalchemy.sql.ddl import DDL +from sqlalchemy.exc import ArgumentError + +from clickhouse_connect.driver.query import quote_identifier + + +# pylint: disable=too-many-ancestors,abstract-method +class CreateDatabase(DDL): + """ + SqlAlchemy DDL statement that is essentially an alternative to the built in CreateSchema DDL class + """ + # pylint: disable-msg=too-many-arguments + def __init__(self, name: str, engine: str = None, zoo_path: str = None, shard_name: str = '{shard}', + replica_name: str = '{replica}'): + """ + :param name: Database name + :param engine: Database ClickHouse engine type + :param zoo_path: ClickHouse zookeeper path for Replicated database engine + :param shard_name: Clickhouse shard name for Replicated database engine + :param replica_name: Replica name for Replicated database engine + """ + if engine and engine not in ('Ordinary', 'Atomic', 'Lazy', 'Replicated'): + raise ArgumentError(f'Unrecognized engine type {engine}') + stmt = f'CREATE DATABASE {quote_identifier(name)}' + if engine: + stmt += f' Engine {engine}' + if engine == 'Replicated': + if not zoo_path: + raise ArgumentError('zoo_path is required for Replicated Database Engine') + stmt += f" ('{zoo_path}', '{shard_name}', '{replica_name}'" + super().__init__(stmt) + + +# pylint: disable=too-many-ancestors,abstract-method +class DropDatabase(DDL): + """ + Alternative DDL statement for built in SqlAlchemy DropSchema DDL class + """ + def __init__(self, name: str): + super().__init__(f'DROP DATABASE {quote_identifier(name)}') diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/tableengine.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/tableengine.py new file mode 100644 index 0000000000..598e2e5adb --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/ddl/tableengine.py @@ -0,0 +1,247 @@ +import logging +from typing import Type, Sequence, Optional, Dict + +from sqlalchemy.exc import ArgumentError, SQLAlchemyError +from sqlalchemy.sql.base import SchemaEventTarget +from sqlalchemy.sql.visitors import Visitable + +logger = logging.getLogger(__name__) + +engine_map: Dict[str, Type['TableEngine']] = {} + + +def tuple_expr(expr_name, value): + """ + Create a table parameter with a tuple or list correctly formatted + :param expr_name: parameter + :param value: string or tuple of strings to format + :return: formatted parameter string + """ + if value is None: + return '' + v = f'{expr_name.strip()}' + if isinstance(value, (tuple, list)): + return f" {v} ({','.join(value)})" + return f'{v} {value}' + + +class TableEngine(SchemaEventTarget, Visitable): + """ + SqlAlchemy Schema element to support ClickHouse table engines. At the moment provides no real + functionality other than the CREATE TABLE argument string + """ + arg_names = () + quoted_args = set() + optional_args = set() + eng_params = () + + def __init_subclass__(cls, **kwargs): + engine_map[cls.__name__] = cls + + def __init__(self, kwargs): + # pylint: disable=no-value-for-parameter + Visitable.__init__(self) + self.name = self.__class__.__name__ + te_name = f'{self.name} Table Engine' + engine_args = [] + for arg_name in self.arg_names: + v = kwargs.pop(arg_name, None) + if v is None: + if arg_name in self.optional_args: + continue + raise ValueError(f'Required engine parameter {arg_name} not provided for {te_name}') + if arg_name in self.quoted_args: + engine_args.append(f"'{v}'") + else: + engine_args.append(v) + if engine_args: + self.arg_str = f'({", ".join(engine_args)})' + params = [] + for param_name in self.eng_params: + v = kwargs.pop(param_name, None) + if v is not None: + params.append(tuple_expr(param_name.upper().replace('_', ' '), v)) + + self.full_engine = 'Engine ' + self.name + if engine_args: + self.full_engine += f'({", ".join(engine_args)})' + if params: + self.full_engine += ' ' + ' '.join(params) + + def compile(self): + return self.full_engine + + def check_primary_keys(self, primary_keys: Sequence): + raise SQLAlchemyError(f'Table Engine {self.name} does not support primary keys') + + def _set_parent(self, parent, **_kwargs): + parent.engine = self + + +class Memory(TableEngine): + pass + + +class Log(TableEngine): + pass + + +class StripeLog(TableEngine): + pass + + +class TinyLog(TableEngine): + pass + + +class Null(TableEngine): + pass + + +class Set(TableEngine): + pass + + +class Dictionary(TableEngine): + arg_names = ['dictionary'] + + # pylint: disable=unused-argument + def __init__(self, dictionary: str = None): + super().__init__(locals()) + + +class Merge(TableEngine): + arg_names = ['db_name, tables_regexp'] + + # pylint: disable=unused-argument + def __init__(self, db_name: str = None, tables_regexp: str = None): + super().__init__(locals()) + + +class File(TableEngine): + arg_names = ['fmt'] + + # pylint: disable=unused-argument + def __init__(self, fmt: str = None): + super().__init__(locals()) + + +class Distributed(TableEngine): + arg_names = ['cluster', 'database', 'table', 'sharding_key', 'policy_name'] + optional_args = {'sharding_key', 'policy_name'} + + # pylint: disable=unused-argument + def __init__(self, cluster: str = None, database: str = None, table=None, + sharding_key: str = None, policy_name: str = None): + super().__init__(locals()) + + +class MergeTree(TableEngine): + eng_params = ['order_by', 'partition_key', 'primary_key', 'sample_by'] + + # pylint: disable=unused-argument + def __init__(self, order_by: str = None, primary_key: str = None, + partition_by: str = None, sample_by: str = None): + if not order_by and not primary_key: + raise ArgumentError(None, 'Either PRIMARY KEY or ORDER BY must be specified') + super().__init__(locals()) + + +class SummingMergeTree(MergeTree): + pass + + +class AggregatingMergeTree(MergeTree): + pass + + +class ReplacingMergeTree(TableEngine): + arg_names = ['ver'] + optional_args = set(arg_names) + eng_params = MergeTree.eng_params + + # pylint: disable=unused-argument + def __init__(self, ver: str = None, order_by: str = None, primary_key: str = None, + partition_by: str = None, sample_by: str = None): + if not order_by and not primary_key: + raise ArgumentError(None, 'Either PRIMARY KEY or ORDER BY must be specified') + super().__init__(locals()) + + +class CollapsingMergeTree(TableEngine): + arg_names = ['sign'] + eng_params = MergeTree.eng_params + + # pylint: disable=unused-argument + def __init__(self, sign: str = None, order_by: str = None, primary_key: str = None, + partition_by: str = None, sample_by: str = None): + if not order_by and not primary_key: + raise ArgumentError(None, 'Either PRIMARY KEY or ORDER BY must be specified') + super().__init__(locals()) + + +class VersionedCollapsingMergeTree(TableEngine): + arg_names = ['sign', 'version'] + eng_params = MergeTree.eng_params + + # pylint: disable=unused-argument + def __init__(self, sign: str = None, version: str = None, order_by: str = None, primary_key: str = None, + partition_by: str = None, sample_by: str = None): + if not order_by and not primary_key: + raise ArgumentError(None, 'Either PRIMARY KEY or ORDER BY must be specified') + super().__init__(locals()) + + +class GraphiteMergeTree(TableEngine): + arg_names = ['config_section'] + eng_params = MergeTree.eng_params + + # pylint: disable=unused-argument + def __init__(self, config_section: str = None, version: str = None, order_by: str = None, primary_key: str = None, + partition_by: str = None, sample_by: str = None): + if not order_by and not primary_key: + raise ArgumentError(None, 'Either PRIMARY KEY or ORDER BY must be specified') + super().__init__(locals()) + + +class ReplicatedMergeTree(TableEngine): + arg_names = ['zk_path', 'replica'] + quoted_args = set(arg_names) + optional_args = quoted_args + eng_params = MergeTree.eng_params + + # pylint: disable=unused-argument + def __init__(self, order_by: str = None, primary_key: str = None, partition_by: str = None, sample_by: str = None, + zk_path: str = None, replica: str = None): + if not order_by and not primary_key: + raise ArgumentError(None, 'Either PRIMARY KEY or ORDER BY must be specified') + super().__init__(locals()) + + +class ReplicatedAggregatingMergeTree(ReplicatedMergeTree): + pass + + +class ReplicatedSummingMergeTree(ReplicatedMergeTree): + pass + + +def build_engine(full_engine: str) -> Optional[TableEngine]: + """ + Factory function to create TableEngine class from ClickHouse full_engine expression + :param full_engine + :return: TableEngine DDL element + """ + if not full_engine: + return None + name = full_engine.split(' ')[0].split('(')[0] + try: + engine_cls = engine_map[name] + except KeyError: + if not name.startswith('System'): + logger.warning('Engine %s not found', name) + return None + engine = engine_cls.__new__(engine_cls) + engine.name = name + engine.full_engine = full_engine + return engine diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py new file mode 100644 index 0000000000..5eb1ed0113 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/dialect.py @@ -0,0 +1,115 @@ + +from sqlalchemy.engine.default import DefaultDialect + +from clickhouse_connect import dbapi + +from clickhouse_connect.cc_sqlalchemy.inspector import ChInspector +from clickhouse_connect.cc_sqlalchemy.sql import full_table +from clickhouse_connect.cc_sqlalchemy.sql.ddlcompiler import ChDDLCompiler +from clickhouse_connect.cc_sqlalchemy import ischema_names, dialect_name +from clickhouse_connect.cc_sqlalchemy.sql.preparer import ChIdentifierPreparer +from clickhouse_connect.driver.query import quote_identifier, format_str + + +# pylint: disable=too-many-public-methods,no-self-use,unused-argument +class ClickHouseDialect(DefaultDialect): + """ + See :py:class:`sqlalchemy.engine.interfaces` + """ + name = dialect_name + driver = 'connect' + + default_schema_name = 'default' + supports_native_decimal = True + supports_native_boolean = True + returns_unicode_strings = True + postfetch_lastrowid = False + ddl_compiler = ChDDLCompiler + preparer = ChIdentifierPreparer + description_encoding = None + max_identifier_length = 127 + ischema_names = ischema_names + inspector = ChInspector + + # pylint: disable=method-hidden + @classmethod + def dbapi(cls): + return dbapi + + def initialize(self, connection): + pass + + @staticmethod + def get_schema_names(connection, **_): + return [row.name for row in connection.execute('SHOW DATABASES')] + + @staticmethod + def has_database(connection, db_name): + return (connection.execute('SELECT name FROM system.databases ' + + f'WHERE name = {format_str(db_name)}')).rowcount > 0 + + def get_table_names(self, connection, schema=None, **kw): + cmd = 'SHOW TABLES' + if schema: + cmd += ' FROM ' + quote_identifier(schema) + return [row.name for row in connection.execute(cmd)] + + def get_primary_keys(self, connection, table_name, schema=None, **kw): + return [] + + # pylint: disable=arguments-renamed + def get_pk_constraint(self, connection, table_name, schema=None, **kw): + return [] + + def get_foreign_keys(self, connection, table_name, schema=None, **kw): + return [] + + def get_temp_table_names(self, connection, schema=None, **kw): + return [] + + def get_view_names(self, connection, schema=None, **kw): + return [] + + def get_temp_view_names(self, connection, schema=None, **kw): + return [] + + def get_view_definition(self, connection, view_name, schema=None, **kw): + pass + + def get_indexes(self, connection, table_name, schema=None, **kw): + return [] + + def get_unique_constraints(self, connection, table_name, schema=None, **kw): + return [] + + def get_check_constraints(self, connection, table_name, schema=None, **kw): + return [] + + def has_table(self, connection, table_name, schema=None, **_kw): + result = connection.execute(f'EXISTS TABLE {full_table(table_name, schema)}') + row = result.fetchone() + return row[0] == 1 + + def has_sequence(self, connection, sequence_name, schema=None, **_kw): + return False + + def do_begin_twophase(self, connection, xid): + raise NotImplementedError + + def do_prepare_twophase(self, connection, xid): + raise NotImplementedError + + def do_rollback_twophase(self, connection, xid, is_prepared=True, recover=False): + raise NotImplementedError + + def do_commit_twophase(self, connection, xid, is_prepared=True, recover=False): + raise NotImplementedError + + def do_recover_twophase(self, connection): + raise NotImplementedError + + def set_isolation_level(self, dbapi_conn, level): + pass + + def get_isolation_level(self, dbapi_conn): + return None diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/inspector.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/inspector.py new file mode 100644 index 0000000000..3ec8f0a109 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/inspector.py @@ -0,0 +1,57 @@ +import sqlalchemy.schema as sa_schema + +from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.orm.exc import NoResultFound + +from clickhouse_connect.cc_sqlalchemy.datatypes.base import sqla_type_from_name +from clickhouse_connect.cc_sqlalchemy.ddl.tableengine import build_engine +from clickhouse_connect.cc_sqlalchemy.sql import full_table +from clickhouse_connect.cc_sqlalchemy import dialect_name as dn + +ch_col_args = ('default_type', 'codec_expression', 'ttl_expression') + + +def get_engine(connection, table_name, schema=None): + result_set = connection.execute( + f"SELECT engine_full FROM system.tables WHERE database = '{schema}' and name = '{table_name}'") + row = next(result_set, None) + if not row: + raise NoResultFound(f'Table {schema}.{table_name} does not exist') + return build_engine(row.engine_full) + + +class ChInspector(Inspector): + + def reflect_table(self, table, include_columns, exclude_columns, *_args, **_kwargs): + schema = table.schema + for col in self.get_columns(table.name, schema): + name = col.pop('name') + if (include_columns and name not in include_columns) or (exclude_columns and name in exclude_columns): + continue + col_type = col.pop('type') + col_args = {f'{dn}_{key}' if key in ch_col_args else key: value for key, value in col.items() if value} + table.append_column(sa_schema.Column(name, col_type, **col_args)) + table.engine = get_engine(self.bind, table.name, schema) + + def get_columns(self, table_name, schema=None, **_kwargs): + table_id = full_table(table_name, schema) + result_set = self.bind.execute(f'DESCRIBE TABLE {table_id}') + if not result_set: + raise NoResultFound(f'Table {full_table} does not exist') + columns = [] + for row in result_set: + sqla_type = sqla_type_from_name(row.type) + col = {'name': row.name, + 'type': sqla_type, + 'nullable': sqla_type.nullable, + 'autoincrement': False, + 'default': row.default_expression, + 'default_type': row.default_type, + 'comment': row.comment, + 'codec_expression': row.codec_expression, + 'ttl_expression': row.ttl_expression} + columns.append(col) + return columns + + +ChInspector.reflecttable = ChInspector.reflect_table # Hack to provide backward compatibility for SQLAlchemy 1.3 diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/__init__.py new file mode 100644 index 0000000000..68becd54d6 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/__init__.py @@ -0,0 +1,15 @@ +from typing import Optional + +from sqlalchemy import Table + +from clickhouse_connect.driver.query import quote_identifier + + +def full_table(table_name: str, schema: Optional[str] = None) -> str: + if table_name.startswith('(') or '.' in table_name or not schema: + return quote_identifier(table_name) + return f'{quote_identifier(schema)}.{quote_identifier(table_name)}' + + +def format_table(table: Table): + return full_table(table.name, table.schema) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py new file mode 100644 index 0000000000..5a97254705 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py @@ -0,0 +1,24 @@ +from sqlalchemy import Column +from sqlalchemy.sql.compiler import DDLCompiler + +from clickhouse_connect.cc_sqlalchemy.sql import format_table +from clickhouse_connect.driver.query import quote_identifier + + +class ChDDLCompiler(DDLCompiler): + + def visit_create_schema(self, create, **_): + return f'CREATE DATABASE {quote_identifier(create.element)}' + + def visit_drop_schema(self, drop, **_): + return f'DROP DATABASE {quote_identifier(drop.element)}' + + def visit_create_table(self, create, **_): + table = create.element + text = f'CREATE TABLE {format_table(table)} (' + text += ', '.join([self.get_column_specification(c.element) for c in create.columns]) + return text + ') ' + table.engine.compile() + + def get_column_specification(self, column: Column, **_): + text = f'{quote_identifier(column.name)} {column.type.compile()}' + return text diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/preparer.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/preparer.py new file mode 100644 index 0000000000..a31b3e7af6 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/preparer.py @@ -0,0 +1,11 @@ +from sqlalchemy.sql.compiler import IdentifierPreparer + +from clickhouse_connect.driver.query import quote_identifier + + +class ChIdentifierPreparer(IdentifierPreparer): + + quote_identifier = staticmethod(quote_identifier) + + def _requires_quotes(self, _value): + return True diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/common.py b/contrib/python/clickhouse-connect/clickhouse_connect/common.py new file mode 100644 index 0000000000..0e4669c7e0 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/common.py @@ -0,0 +1,74 @@ +import sys +from dataclasses import dataclass +from typing import Any, Sequence, Optional, Dict +from clickhouse_connect import __version__ + + +from clickhouse_connect.driver.exceptions import ProgrammingError + + +def version(): + return __version__.version + + +def format_error(msg: str) -> str: + max_size = _common_settings['max_error_size'].value + if max_size: + return msg[:max_size] + return msg + + +@dataclass +class CommonSetting: + name: str + options: Sequence[Any] + default: Any + value: Optional[Any] = None + + +_common_settings: Dict[str, CommonSetting] = {} + + +def build_client_name(client_name: str): + product_name = get_setting('product_name') + product_name = product_name.strip() + ' ' if product_name else '' + client_name = client_name.strip() + ' ' if client_name else '' + py_version = sys.version.split(' ', maxsplit=1)[0] + return f'{client_name}{product_name}clickhouse-connect/{version()} (lv:py/{py_version}; os:{sys.platform})' + + +def get_setting(name: str): + setting = _common_settings.get(name) + if setting is None: + raise ProgrammingError(f'Unrecognized common setting {name}') + return setting.value if setting.value is not None else setting.default + + +def set_setting(name: str, value: Any): + setting = _common_settings.get(name) + if setting is None: + raise ProgrammingError(f'Unrecognized common setting {name}') + if setting.options and value not in setting.options: + raise ProgrammingError(f'Unrecognized option {value} for setting {name})') + if value == setting.default: + setting.value = None + else: + setting.value = value + + +def _init_common(name: str, options: Sequence[Any], default: Any): + _common_settings[name] = CommonSetting(name, options, default) + + +_init_common('autogenerate_session_id', (True, False), True) +_init_common('dict_parameter_format', ('json', 'map'), 'json') +_init_common('invalid_setting_action', ('send', 'drop', 'error'), 'error') +_init_common('max_connection_age', (), 10 * 60) # Max time in seconds to keep reusing a database TCP connection +_init_common('product_name', (), '') # Product name used as part of client identification for ClickHouse query_log +_init_common('readonly', (0, 1), 0) # Implied "read_only" ClickHouse settings for versions prior to 19.17 + +# Use the client protocol version This is needed for DateTime timezone columns but breaks with current version of +# chproxy +_init_common('use_protocol_version', (True, False), True) + +_init_common('max_error_size', (), 1024) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/__init__.py new file mode 100644 index 0000000000..aa9b8c2f5d --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/__init__.py @@ -0,0 +1,7 @@ +import clickhouse_connect.datatypes.container +import clickhouse_connect.datatypes.network +import clickhouse_connect.datatypes.numeric +import clickhouse_connect.datatypes.special +import clickhouse_connect.datatypes.string +import clickhouse_connect.datatypes.temporal +import clickhouse_connect.datatypes.registry diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py new file mode 100644 index 0000000000..b1990280eb --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/base.py @@ -0,0 +1,364 @@ +import array +import logging + +from abc import ABC +from math import log +from typing import NamedTuple, Dict, Type, Any, Sequence, MutableSequence, Optional, Union, Collection + +from clickhouse_connect.driver.common import array_type, int_size, write_array, write_uint64, low_card_version +from clickhouse_connect.driver.context import BaseQueryContext +from clickhouse_connect.driver.ctypes import numpy_conv, data_conv +from clickhouse_connect.driver.exceptions import NotSupportedError +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.query import QueryContext +from clickhouse_connect.driver.types import ByteSource +from clickhouse_connect.driver.options import np, pd + +logger = logging.getLogger(__name__) +ch_read_formats = {} +ch_write_formats = {} + + +class TypeDef(NamedTuple): + """ + Immutable tuple that contains all additional information needed to construct a particular ClickHouseType + """ + wrappers: tuple = () + keys: tuple = () + values: tuple = () + + @property + def arg_str(self): + return f"({', '.join(str(v) for v in self.values)})" if self.values else '' + + +class ClickHouseType(ABC): + """ + Base class for all ClickHouseType objects. + """ + __slots__ = 'nullable', 'low_card', 'wrappers', 'type_def', '__dict__' + _name_suffix = '' + encoding = 'utf8' + np_type = 'O' # Default to Numpy Object type + nano_divisor = 0 # Only relevant for date like objects + byte_size = 0 + valid_formats = 'native' + + python_type = None + base_type = None + + def __init_subclass__(cls, registered: bool = True): + if registered: + cls.base_type = cls.__name__ + type_map[cls.base_type] = cls + + @classmethod + def build(cls: Type['ClickHouseType'], type_def: TypeDef): + return cls(type_def) + + @classmethod + def _active_format(cls, fmt_map: Dict[Type['ClickHouseType'], str], ctx: BaseQueryContext): + ctx_fmt = ctx.active_fmt(cls.base_type) + if ctx_fmt: + return ctx_fmt + return fmt_map.get(cls, 'native') + + @classmethod + def read_format(cls, ctx: BaseQueryContext): + return cls._active_format(ch_read_formats, ctx) + + @classmethod + def write_format(cls, ctx: BaseQueryContext): + return cls._active_format(ch_write_formats, ctx) + + def __init__(self, type_def: TypeDef): + """ + Base class constructor that sets Nullable and LowCardinality wrappers + :param type_def: ClickHouseType base configuration parameters + """ + self.type_def = type_def + self.wrappers = type_def.wrappers + self.low_card = 'LowCardinality' in self.wrappers + self.nullable = 'Nullable' in self.wrappers + + def __eq__(self, other): + return other.__class__ == self.__class__ and self.type_def == other.type_def + + def __hash__(self): + return hash((self.type_def, self.__class__)) + + @property + def name(self): + name = f'{self.base_type}{self._name_suffix}' + for wrapper in reversed(self.wrappers): + name = f'{wrapper}({name})' + return name + + def data_size(self, sample: Sequence) -> int: + if self.low_card: + values = set(sample) + d_size = self._data_size(values) + 2 + else: + d_size = self._data_size(sample) + if self.nullable: + d_size += 1 + return d_size + + def _data_size(self, _sample: Collection) -> int: + if self.byte_size: + return self.byte_size + return 0 + + def write_column_prefix(self, dest: bytearray): + """ + Prefix is primarily used is for the LowCardinality version (but see the JSON data type). Because of the + way the ClickHouse C++ code is written, this must be done before any data is written even if the + LowCardinality column is within a container. The only recognized low cardinality version is 1 + :param dest: The native protocol binary write buffer + """ + if self.low_card: + write_uint64(low_card_version, dest) + + def read_column_prefix(self, source: ByteSource): + """ + Read the low cardinality version. Like the write method, this has to happen immediately for container classes + :param source: The native protocol binary read buffer + :return: updated read pointer + """ + if self.low_card: + v = source.read_uint64() + if v != low_card_version: + logger.warning('Unexpected low cardinality version %d reading type %s', v, self.name) + + def read_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: + """ + Wrapping read method for all ClickHouseType data types. Only overridden for container classes so that + the LowCardinality version is read for the contained types + :param source: Native protocol binary read buffer + :param num_rows: Number of rows expected in the column + :param ctx: QueryContext for query specific settings + :return: The decoded column data as a sequence and the updated location pointer + """ + self.read_column_prefix(source) + return self.read_column_data(source, num_rows, ctx) + + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: + """ + Public read method for all ClickHouseType data type columns + :param source: Native protocol binary read buffer + :param num_rows: Number of rows expected in the column + :param ctx: QueryContext for query specific settings + :return: The decoded column plus the updated location pointer + """ + if self.low_card: + column = self._read_low_card_column(source, num_rows, ctx) + elif self.nullable: + column = self._read_nullable_column(source, num_rows, ctx) + else: + column = self._read_column_binary(source, num_rows, ctx) + return self._finalize_column(column, ctx) + + def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: + null_map = source.read_bytes(num_rows) + column = self._read_column_binary(source, num_rows, ctx) + null_obj = self._active_null(ctx) + return data_conv.build_nullable_column(column, null_map, null_obj) + + # The binary methods are really abstract, but they aren't implemented for container classes which + # delegate binary operations to their elements + + # pylint: disable=no-self-use + def _read_column_binary(self, + _source: ByteSource, + _num_rows: int, _ctx: QueryContext) -> Union[Sequence, MutableSequence]: + """ + Lowest level read method for ClickHouseType native data columns + :param _source: Native protocol binary read buffer + :param _num_rows: Expected number of rows in the column + :return: Decoded column plus updated read buffer + """ + return [], 0 + + def _finalize_column(self, column: Sequence, _ctx: QueryContext) -> Sequence: + return column + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + """ + Lowest level write method for ClickHouseType data columns + :param column: Python data column + :param dest: Native protocol write buffer + :param ctx: Insert Context with insert specific settings + """ + + def write_column(self, column: Sequence, dest: bytearray, ctx: InsertContext): + """ + Wrapping write method for ClickHouseTypes. Only overridden for container types that so that + the write_native_prefix is done at the right time for contained types + :param column: Column/sequence of Python values to write + :param dest: Native binary write buffer + :param ctx: Insert Context with insert specific settings + """ + self.write_column_prefix(dest) + self.write_column_data(column, dest, ctx) + + def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext): + """ + Public native write method for ClickHouseTypes. Delegates the actual write to either the LowCardinality + write method or the _write_native_binary method of the type + :param column: Sequence of Python data + :param dest: Native binary write buffer + :param ctx: Insert Context with insert specific settings + """ + if self.low_card: + self._write_column_low_card(column, dest, ctx) + else: + if self.nullable: + dest += bytes([1 if x is None else 0 for x in column]) + self._write_column_binary(column, dest, ctx) + + # pylint: disable=no-member + def _read_low_card_column(self, source: ByteSource, num_rows: int, ctx: QueryContext): + if num_rows == 0: + return [] + key_data = source.read_uint64() + key_sz = 2 ** (key_data & 0xff) + index_cnt = source.read_uint64() + index = self._read_column_binary(source, index_cnt, ctx) + key_cnt = source.read_uint64() + keys = source.read_array(array_type(key_sz, False), key_cnt) + if self.nullable: + return self._build_lc_nullable_column(index, keys, ctx) + return self._build_lc_column(index, keys, ctx) + + def _build_lc_column(self, index: Sequence, keys: array.array, _ctx: QueryContext): + return [index[key] for key in keys] + + def _build_lc_nullable_column(self, index: Sequence, keys: array.array, ctx: QueryContext): + return data_conv.build_lc_nullable_column(index, keys, self._active_null(ctx)) + + def _write_column_low_card(self, column: Sequence, dest: bytearray, ctx: InsertContext): + if len(column) == 0: + return + keys = [] + index = [] + rev_map = {} + rmg = rev_map.get + if self.nullable: + index.append(None) + key = 1 + for x in column: + if x is None: + keys.append(0) + else: + ix = rmg(x) + if ix is None: + keys.append(key) + index.append(x) + rev_map[x] = key + key += 1 + else: + keys.append(ix) + else: + key = 0 + for x in column: + ix = rmg(x) + if ix is None: + keys.append(key) + index.append(x) + rev_map[x] = key + key += 1 + else: + keys.append(ix) + ix_type = int(log(len(index), 2)) >> 3 # power of two bytes needed to store the total number of keys + write_uint64((1 << 9) | (1 << 10) | ix_type, dest) # Index type plus new dictionary (9) and additional keys(10) + write_uint64(len(index), dest) + self._write_column_binary(index, dest, ctx) + write_uint64(len(keys), dest) + write_array(array_type(1 << ix_type, False), keys, dest) + + def _active_null(self, _ctx: QueryContext) -> Any: + return None + + def _first_value(self, column: Sequence) -> Optional[Any]: + if self.nullable: + return next((x for x in column if x is not None), None) + if len(column): + return column[0] + return None + + +EMPTY_TYPE_DEF = TypeDef() +NULLABLE_TYPE_DEF = TypeDef(wrappers=('Nullable',)) +LC_TYPE_DEF = TypeDef(wrappers=('LowCardinality',)) +type_map: Dict[str, Type[ClickHouseType]] = {} + + +class ArrayType(ClickHouseType, ABC, registered=False): + """ + ClickHouse type that utilizes Python or Numpy arrays for fast reads and writes of binary data. + arrays can only be used for ClickHouse types that can be translated into UInt64 (and smaller) integers + or Float32/64 + """ + _signed = True + _array_type = None + _struct_type = None + valid_formats = 'string', 'native' + python_type = int + + def __init_subclass__(cls, registered: bool = True): + super().__init_subclass__(registered) + if cls._array_type in ('i', 'I') and int_size == 2: + cls._array_type = 'L' if cls._array_type.isupper() else 'l' + if isinstance(cls._array_type, str) and cls._array_type: + cls._struct_type = '<' + cls._array_type + cls.byte_size = array.array(cls._array_type).itemsize + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + if ctx.use_numpy: + return numpy_conv.read_numpy_array(source, self.np_type, num_rows) + return source.read_array(self._array_type, num_rows) + + def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: + return data_conv.read_nullable_array(source, self._array_type, num_rows, self._active_null(ctx)) + + def _build_lc_column(self, index: Sequence, keys: array.array, ctx: QueryContext): + if ctx.use_numpy: + return np.fromiter((index[key] for key in keys), dtype=index.dtype, count=len(index)) + return super()._build_lc_column(index, keys, ctx) + + def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence: + if self.read_format(ctx) == 'string': + return [str(x) for x in column] + if ctx.use_extended_dtypes and self.nullable: + return pd.array(column, dtype=self.base_type) + if ctx.use_numpy and self.nullable and (not ctx.use_none): + return np.array(column, dtype=self.np_type) + return column + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + if len(column) and self.nullable: + column = [0 if x is None else x for x in column] + write_array(self._array_type, column, dest) + + def _active_null(self, ctx: QueryContext): + if ctx.as_pandas and ctx.use_extended_dtypes: + return pd.NA + if ctx.use_none: + return None + return 0 + + +class UnsupportedType(ClickHouseType, ABC, registered=False): + """ + Base class for ClickHouse types that can't be serialized/deserialized into Python types. + Mostly useful just for DDL statements + """ + def __init__(self, type_def: TypeDef): + super().__init__(type_def) + self._name_suffix = type_def.arg_str + + def _read_column_binary(self, source: Sequence, num_rows: int, ctx: QueryContext): + raise NotSupportedError(f'{self.name} deserialization not supported') + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + raise NotSupportedError(f'{self.name} serialization not supported') diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py new file mode 100644 index 0000000000..0281867b8d --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/container.py @@ -0,0 +1,308 @@ +import array +import logging +from typing import Sequence, Collection + +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.query import QueryContext +from clickhouse_connect.driver.types import ByteSource +from clickhouse_connect.json_impl import any_to_json +from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef +from clickhouse_connect.driver.common import must_swap +from clickhouse_connect.datatypes.registry import get_from_name + +logger = logging.getLogger(__name__) + + +class Array(ClickHouseType): + __slots__ = ('element_type',) + python_type = list + + def __init__(self, type_def: TypeDef): + super().__init__(type_def) + self.element_type = get_from_name(type_def.values[0]) + self._name_suffix = f'({self.element_type.name})' + + def read_column_prefix(self, source: ByteSource): + return self.element_type.read_column_prefix(source) + + def _data_size(self, sample: Sequence) -> int: + if len(sample) == 0: + return 8 + total = 0 + for x in sample: + total += self.element_type.data_size(x) + return total // len(sample) + 8 + + # pylint: disable=too-many-locals + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext): + final_type = self.element_type + depth = 1 + while isinstance(final_type, Array): + depth += 1 + final_type = final_type.element_type + level_size = num_rows + offset_sizes = [] + for _ in range(depth): + level_offsets = source.read_array('Q', level_size) + offset_sizes.append(level_offsets) + level_size = level_offsets[-1] if level_offsets else 0 + if level_size: + all_values = final_type.read_column_data(source, level_size, ctx) + else: + all_values = [] + column = all_values if isinstance(all_values, list) else list(all_values) + for offset_range in reversed(offset_sizes): + data = [] + last = 0 + for x in offset_range: + data.append(column[last: x]) + last = x + column = data + return column + + def write_column_prefix(self, dest: bytearray): + self.element_type.write_column_prefix(dest) + + def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext): + final_type = self.element_type + depth = 1 + while isinstance(final_type, Array): + depth += 1 + final_type = final_type.element_type + for _ in range(depth): + total = 0 + data = [] + offsets = array.array('Q') + for x in column: + total += len(x) + offsets.append(total) + data.extend(x) + if must_swap: + offsets.byteswap() + dest += offsets.tobytes() + column = data + final_type.write_column_data(column, dest, ctx) + + +class Tuple(ClickHouseType): + _slots = 'element_names', 'element_types' + python_type = tuple + valid_formats = 'tuple', 'dict', 'json', 'native' # native is 'tuple' for unnamed tuples, and dict for named tuples + + def __init__(self, type_def: TypeDef): + super().__init__(type_def) + self.element_names = type_def.keys + self.element_types = [get_from_name(name) for name in type_def.values] + if self.element_names: + self._name_suffix = f"({', '.join(k + ' ' + str(v) for k, v in zip(type_def.keys, type_def.values))})" + else: + self._name_suffix = type_def.arg_str + + def _data_size(self, sample: Collection) -> int: + if len(sample) == 0: + return 0 + elem_size = 0 + is_dict = self.element_names and isinstance(self._first_value(list(sample)), dict) + for ix, e_type in enumerate(self.element_types): + if e_type.byte_size > 0: + elem_size += e_type.byte_size + elif is_dict: + elem_size += e_type.data_size([x.get(self.element_names[ix], None) for x in sample]) + else: + elem_size += e_type.data_size([x[ix] for x in sample]) + return elem_size + + def read_column_prefix(self, source: ByteSource): + for e_type in self.element_types: + e_type.read_column_prefix(source) + + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext): + columns = [] + e_names = self.element_names + for e_type in self.element_types: + column = e_type.read_column_data(source, num_rows, ctx) + columns.append(column) + if e_names and self.read_format(ctx) != 'tuple': + dicts = [{} for _ in range(num_rows)] + for ix, x in enumerate(dicts): + for y, key in enumerate(e_names): + x[key] = columns[y][ix] + if self.read_format(ctx) == 'json': + to_json = any_to_json + return [to_json(x) for x in dicts] + return dicts + return tuple(zip(*columns)) + + def write_column_prefix(self, dest: bytearray): + for e_type in self.element_types: + e_type.write_column_prefix(dest) + + def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext): + if self.element_names and isinstance(self._first_value(column), dict): + columns = self.convert_dict_insert(column) + else: + columns = list(zip(*column)) + for e_type, elem_column in zip(self.element_types, columns): + e_type.write_column_data(elem_column, dest, ctx) + + def convert_dict_insert(self, column: Sequence) -> Sequence: + names = self.element_names + col = [[] for _ in names] + for x in column: + for ix, name in enumerate(names): + col[ix].append(x.get(name)) + return col + + +class Map(ClickHouseType): + _slots = 'key_type', 'value_type' + python_type = dict + + def __init__(self, type_def: TypeDef): + super().__init__(type_def) + self.key_type = get_from_name(type_def.values[0]) + self.value_type = get_from_name(type_def.values[1]) + self._name_suffix = type_def.arg_str + + def _data_size(self, sample: Collection) -> int: + total = 0 + if len(sample) == 0: + return 0 + for x in sample: + total += self.key_type.data_size(x.keys()) + total += self.value_type.data_size(x.values()) + return total // len(sample) + + def read_column_prefix(self, source: ByteSource): + self.key_type.read_column_prefix(source) + self.value_type.read_column_prefix(source) + + # pylint: disable=too-many-locals + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext): + offsets = source.read_array('Q', num_rows) + total_rows = 0 if len(offsets) == 0 else offsets[-1] + keys = self.key_type.read_column_data(source, total_rows, ctx) + values = self.value_type.read_column_data(source, total_rows, ctx) + all_pairs = tuple(zip(keys, values)) + column = [] + app = column.append + last = 0 + for offset in offsets: + app(dict(all_pairs[last: offset])) + last = offset + return column + + def write_column_prefix(self, dest: bytearray): + self.key_type.write_column_prefix(dest) + self.value_type.write_column_prefix(dest) + + def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext): + offsets = array.array('Q') + keys = [] + values = [] + total = 0 + for v in column: + total += len(v) + offsets.append(total) + keys.extend(v.keys()) + values.extend(v.values()) + if must_swap: + offsets.byteswap() + dest += offsets.tobytes() + self.key_type.write_column_data(keys, dest, ctx) + self.value_type.write_column_data(values, dest, ctx) + + +class Nested(ClickHouseType): + __slots__ = 'tuple_array', 'element_names', 'element_types' + python_type = Sequence[dict] + + def __init__(self, type_def): + super().__init__(type_def) + self.element_names = type_def.keys + self.tuple_array = get_from_name(f"Array(Tuple({','.join(type_def.values)}))") + self.element_types = self.tuple_array.element_type.element_types + cols = [f'{x[0]} {x[1].name}' for x in zip(type_def.keys, self.element_types)] + self._name_suffix = f"({', '.join(cols)})" + + def _data_size(self, sample: Collection) -> int: + keys = self.element_names + array_sample = [[tuple(sub_row[key] for key in keys) for sub_row in row] for row in sample] + return self.tuple_array.data_size(array_sample) + + def read_column_prefix(self, source: ByteSource): + self.tuple_array.read_column_prefix(source) + + def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext): + keys = self.element_names + data = self.tuple_array.read_column_data(source, num_rows, ctx) + return [[dict(zip(keys, x)) for x in row] for row in data] + + def write_column_prefix(self, dest: bytearray): + self.tuple_array.write_column_prefix(dest) + + def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext): + keys = self.element_names + data = [[tuple(sub_row[key] for key in keys) for sub_row in row] for row in column] + self.tuple_array.write_column_data(data, dest, ctx) + + +class JSON(ClickHouseType): + python_type = dict + # Native is a Python type (primitive, dict, array), string is an actual JSON string + valid_formats = 'string', 'native' + + def write_column_prefix(self, dest: bytearray): + dest.append(0x01) + + def _data_size(self, sample: Collection) -> int: + if len(sample) == 0: + return 0 + total = 0 + for x in sample: + if isinstance(x, str): + total += len(x) + elif x: + total += len(any_to_json(x)) + return total // len(sample) + 1 + + # pylint: disable=duplicate-code + def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext): + app = dest.append + first = self._first_value(column) + if isinstance(first, str) or self.write_format(ctx) == 'string': + for x in column: + v = x.encode() + sz = len(v) + while True: + b = sz & 0x7f + sz >>= 7 + if sz == 0: + app(b) + break + app(0x80 | b) + dest += v + else: + to_json = any_to_json + for x in column: + v = to_json(x) + sz = len(v) + while True: + b = sz & 0x7f + sz >>= 7 + if sz == 0: + app(b) + break + app(0x80 | b) + dest += v + + +class Object(JSON): + python_type = dict + + def __init__(self, type_def): + data_type = type_def.values[0].lower().replace(' ', '') + if data_type not in ("'json'", "nullable('json')"): + raise NotImplementedError('Only json or Nullable(json) Object type is currently supported') + super().__init__(type_def) + self._name_suffix = type_def.arg_str diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/format.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/format.py new file mode 100644 index 0000000000..70f8ca8489 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/format.py @@ -0,0 +1,79 @@ +import re + +from typing import Dict, Type, Sequence, Optional + +from clickhouse_connect.datatypes.base import ClickHouseType, type_map, ch_read_formats, ch_write_formats +from clickhouse_connect.driver.exceptions import ProgrammingError + +json_re = re.compile('json', re.IGNORECASE) + + +def set_default_formats(*args, **kwargs): + fmt_map = format_map(_convert_arguments(*args, **kwargs)) + ch_read_formats.update(fmt_map) + ch_write_formats.update(fmt_map) + + +def clear_all_formats(): + ch_read_formats.clear() + ch_write_formats.clear() + + +def clear_default_format(pattern: str): + for ch_type in _matching_types(pattern): + ch_read_formats.pop(ch_type, None) + ch_write_formats.pop(ch_type, None) + + +def set_write_format(pattern: str, fmt: str): + pattern = json_re.sub('object', pattern) + for ch_type in _matching_types(pattern): + ch_write_formats[ch_type] = fmt + + +def clear_write_format(pattern: str): + for ch_type in _matching_types(pattern): + ch_write_formats.pop(ch_type, None) + + +def set_read_format(pattern: str, fmt: str): + for ch_type in _matching_types(pattern): + ch_read_formats[ch_type] = fmt + + +def clear_read_format(pattern: str): + for ch_type in _matching_types(pattern): + ch_read_formats.pop(ch_type, None) + + +def format_map(fmt_map: Optional[Dict[str, str]]) -> Dict[Type[ClickHouseType], str]: + if not fmt_map: + return {} + final_map = {} + for pattern, fmt in fmt_map.items(): + for ch_type in _matching_types(pattern, fmt): + final_map[ch_type] = fmt + return final_map + + +def _convert_arguments(*args, **kwargs) -> Dict[str, str]: + fmt_map = {} + try: + for x in range(0, len(args), 2): + fmt_map[args[x]] = args[x + 1] + except (IndexError, TypeError, ValueError) as ex: + raise ProgrammingError('Invalid type/format arguments for format method') from ex + fmt_map.update(kwargs) + return fmt_map + + +def _matching_types(pattern: str, fmt: str = None) -> Sequence[Type[ClickHouseType]]: + re_pattern = re.compile(pattern.replace('*', '.*'), re.IGNORECASE) + matches = [ch_type for type_name, ch_type in type_map.items() if re_pattern.match(type_name)] + if not matches: + raise ProgrammingError(f'Unrecognized ClickHouse type {pattern} when setting formats') + if fmt: + invalid = [ch_type.__name__ for ch_type in matches if fmt not in ch_type.valid_formats] + if invalid: + raise ProgrammingError(f"{fmt} is not a valid format for ClickHouse types {','.join(invalid)}.") + return matches diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py new file mode 100644 index 0000000000..14b7bc3b9a --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/network.py @@ -0,0 +1,129 @@ +import socket +from ipaddress import IPv4Address, IPv6Address +from typing import Union, MutableSequence, Sequence + +from clickhouse_connect.datatypes.base import ClickHouseType +from clickhouse_connect.driver.common import write_array, int_size +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.query import QueryContext +from clickhouse_connect.driver.types import ByteSource +from clickhouse_connect.driver.ctypes import data_conv + +IPV4_V6_MASK = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff' +V6_NULL = bytes(b'\x00' * 16) + + +# pylint: disable=protected-access +class IPv4(ClickHouseType): + _array_type = 'L' if int_size == 2 else 'I' + valid_formats = 'string', 'native', 'int' + python_type = IPv4Address + byte_size = 4 + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + if self.read_format(ctx) == 'int': + return source.read_array(self._array_type, num_rows) + if self.read_format(ctx) == 'string': + column = source.read_array(self._array_type, num_rows) + return [socket.inet_ntoa(x.to_bytes(4, 'big')) for x in column] + return data_conv.read_ipv4_col(source, num_rows) + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + first = self._first_value(column) + if isinstance(first, str): + fixed = 24, 16, 8, 0 + # pylint: disable=consider-using-generator + column = [(sum([int(b) << fixed[ix] for ix, b in enumerate(x.split('.'))])) if x else 0 for x in column] + else: + if self.nullable: + column = [x._ip if x else 0 for x in column] + else: + column = [x._ip for x in column] + write_array(self._array_type, column, dest) + + def _active_null(self, ctx: QueryContext): + fmt = self.read_format(ctx) + if ctx.use_none: + return None + if fmt == 'string': + return '0.0.0.0' + if fmt == 'int': + return 0 + return None + + +# pylint: disable=protected-access +class IPv6(ClickHouseType): + valid_formats = 'string', 'native' + python_type = IPv6Address + byte_size = 16 + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + if self.read_format(ctx) == 'string': + return self._read_binary_str(source, num_rows) + return self._read_binary_ip(source, num_rows) + + @staticmethod + def _read_binary_ip(source: ByteSource, num_rows: int): + fast_ip_v6 = IPv6Address.__new__ + fast_ip_v4 = IPv4Address.__new__ + with_scope_id = '_scope_id' in IPv6Address.__slots__ + new_col = [] + app = new_col.append + ifb = int.from_bytes + for _ in range(num_rows): + int_value = ifb(source.read_bytes(16), 'big') + if int_value >> 32 == 0xFFFF: + ipv4 = fast_ip_v4(IPv4Address) + ipv4._ip = int_value & 0xFFFFFFFF + app(ipv4) + else: + ipv6 = fast_ip_v6(IPv6Address) + ipv6._ip = int_value + if with_scope_id: + ipv6._scope_id = None + app(ipv6) + return new_col + + @staticmethod + def _read_binary_str(source: ByteSource, num_rows: int): + new_col = [] + app = new_col.append + v4mask = IPV4_V6_MASK + tov4 = socket.inet_ntoa + tov6 = socket.inet_ntop + af6 = socket.AF_INET6 + for _ in range(num_rows): + x = source.read_bytes(16) + if x[:12] == v4mask: + app(tov4(x[12:])) + else: + app(tov6(af6, x)) + return new_col + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + v = V6_NULL + first = self._first_value(column) + v4mask = IPV4_V6_MASK + af6 = socket.AF_INET6 + tov6 = socket.inet_pton + if isinstance(first, str): + for x in column: + if x is None: + dest += v + elif '.' in x: + dest += v4mask + bytes(int(b) for b in x.split('.')) + else: + dest += tov6(af6, x) + else: + for x in column: + if x is None: + dest += v + else: + b = x.packed + dest += b if len(b) == 16 else (v4mask + b) + + def _active_null(self, ctx): + if ctx.use_none: + return None + return '::' if self.read_format(ctx) == 'string' else V6_NULL diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py new file mode 100644 index 0000000000..7796098785 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/numeric.py @@ -0,0 +1,357 @@ +import decimal +from typing import Union, Type, Sequence, MutableSequence + +from math import nan + +from clickhouse_connect.datatypes.base import TypeDef, ArrayType, ClickHouseType +from clickhouse_connect.driver.common import array_type, write_array, decimal_size, decimal_prec +from clickhouse_connect.driver.ctypes import numpy_conv, data_conv +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.options import pd, np +from clickhouse_connect.driver.query import QueryContext +from clickhouse_connect.driver.types import ByteSource + + +class Int8(ArrayType): + _array_type = 'b' + np_type = 'b' + + +class UInt8(ArrayType): + _array_type = 'B' + np_type = 'B' + + +class Int16(ArrayType): + _array_type = 'h' + np_type = '<i2' + + +class UInt16(ArrayType): + _array_type = 'H' + np_type = '<u2' + + +class Int32(ArrayType): + _array_type = 'i' + np_type = '<i4' + + +class UInt32(ArrayType): + _array_type = 'I' + np_type = '<u4' + + +class Int64(ArrayType): + _array_type = 'q' + np_type = '<i8' + + +class UInt64(ArrayType): + valid_formats = 'signed', 'native' + _array_type = 'Q' + np_type = '<u8' + python_type = int + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + fmt = self.read_format(ctx) + if ctx.use_numpy: + np_type = '<q' if fmt == 'signed' else '<u8' + return numpy_conv.read_numpy_array(source, np_type, num_rows) + arr_type = 'q' if fmt == 'signed' else 'Q' + return source.read_array(arr_type, num_rows) + + def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: + return data_conv.read_nullable_array(source, 'q' if self.read_format(ctx) == 'signed' else 'Q', + num_rows, self._active_null(ctx)) + + def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence: + fmt = self.read_format(ctx) + if fmt == 'string': + return [str(x) for x in column] + if ctx.use_extended_dtypes and self.nullable: + return pd.array(column, dtype='Int64' if fmt == 'signed' else 'UInt64') + if ctx.use_numpy and self.nullable and (not ctx.use_none): + return np.array(column, dtype='<q' if fmt == 'signed' else '<u8') + return column + + +class BigInt(ClickHouseType, registered=False): + _signed = True + valid_formats = 'string', 'native' + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + signed = self._signed + sz = self.byte_size + column = [] + app = column.append + ifb = int.from_bytes + if self.read_format(ctx) == 'string': + for _ in range(num_rows): + app(str(ifb(source.read_bytes(sz), 'little', signed=signed))) + else: + for _ in range(num_rows): + app(ifb(source.read_bytes(sz), 'little', signed=signed)) + return column + + # pylint: disable=too-many-branches + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + if len(column) == 0: + return + first = self._first_value(column) + sz = self.byte_size + signed = self._signed + empty = bytes(b'\x00' * sz) + ext = dest.extend + if isinstance(first, str) or self.write_format(ctx) == 'string': + if self.nullable: + for x in column: + if x: + ext(int(x).to_bytes(sz, 'little', signed=signed)) + else: + ext(empty) + else: + for x in column: + ext(int(x).to_bytes(sz, 'little', signed=signed)) + else: + if self.nullable: + for x in column: + if x: + ext(x.to_bytes(sz, 'little', signed=signed)) + else: + ext(empty) + else: + for x in column: + ext(x.to_bytes(sz, 'little', signed=signed)) + + +class Int128(BigInt): + byte_size = 16 + _signed = True + + +class UInt128(BigInt): + byte_size = 16 + _signed = False + + +class Int256(BigInt): + byte_size = 32 + _signed = True + + +class UInt256(BigInt): + byte_size = 32 + _signed = False + + +class Float(ArrayType, registered=False): + _array_type = 'f' + python_type = float + + def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence: + if self.read_format(ctx) == 'string': + return [str(x) for x in column] + if ctx.use_numpy and self.nullable and (not ctx.use_none): + return np.array(column, dtype=self.np_type) + return column + + def _active_null(self, ctx: QueryContext): + if ctx.use_extended_dtypes: + return nan + if ctx.use_none: + return None + if ctx.use_numpy: + return nan + return 0.0 + + +class Float32(Float): + np_type = '<f4' + + +class Float64(Float): + _array_type = 'd' + np_type = '<f8' + + +class Bool(ClickHouseType): + np_type = '?' + python_type = bool + byte_size = 1 + + def _read_column_binary(self, source: ByteSource, num_rows: int, _ctx: QueryContext): + column = source.read_bytes(num_rows) + return [b != 0 for b in column] + + def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence: + if ctx.use_numpy: + return np.array(column) + return column + + def _write_column_binary(self, column, dest, _ctx): + write_array('B', [1 if x else 0 for x in column], dest) + + +class Boolean(Bool): + pass + + +class Enum(ClickHouseType): + __slots__ = '_name_map', '_int_map' + _array_type = 'b' + valid_formats = 'native', 'int' + python_type = str + + def __init__(self, type_def: TypeDef): + super().__init__(type_def) + escaped_keys = [key.replace("'", "\\'") for key in type_def.keys] + self._name_map = dict(zip(type_def.keys, type_def.values)) + self._int_map = dict(zip(type_def.values, type_def.keys)) + val_str = ', '.join(f"'{key}' = {value}" for key, value in zip(escaped_keys, type_def.values)) + self._name_suffix = f'({val_str})' + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + column = source.read_array(self._array_type, num_rows) + if self.read_format(ctx) == 'int': + return column + lookup = self._int_map.get + return [lookup(x, None) for x in column] + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, _ctx): + first = self._first_value(column) + if first is None or not isinstance(first, str): + if self.nullable: + column = [0 if not x else x for x in column] + write_array(self._array_type, column, dest) + else: + lookup = self._name_map.get + write_array(self._array_type, [lookup(x, 0) for x in column], dest) + + +class Enum8(Enum): + _array_type = 'b' + byte_size = 1 + + +class Enum16(Enum): + _array_type = 'h' + byte_size = 2 + + +class Decimal(ClickHouseType): + __slots__ = 'prec', 'scale', '_mult', '_zeros', 'byte_size', '_array_type' + python_type = decimal.Decimal + dec_size = 0 + + @classmethod + def build(cls: Type['Decimal'], type_def: TypeDef): + size = cls.dec_size + if size == 0: + prec = type_def.values[0] + scale = type_def.values[1] + size = decimal_size(prec) + else: + prec = decimal_prec[size] + scale = type_def.values[0] + type_cls = BigDecimal if size > 64 else Decimal + return type_cls(type_def, prec, size, scale) + + def __init__(self, type_def: TypeDef, prec, size, scale): + super().__init__(type_def) + self.prec = prec + self.scale = scale + self._mult = 10 ** scale + self.byte_size = size // 8 + self._zeros = bytes([0] * self.byte_size) + self._name_suffix = f'({prec}, {scale})' + self._array_type = array_type(self.byte_size, True) + + def _read_column_binary(self, source: ByteSource, num_rows: int, _ctx: QueryContext): + column = source.read_array(self._array_type, num_rows) + dec = decimal.Decimal + scale = self.scale + prec = self.prec + if scale == 0: + return [dec(str(x)) for x in column] + new_col = [] + app = new_col.append + for x in column: + if x >= 0: + digits = str(x).rjust(prec, '0') + app(dec(f'{digits[:-scale]}.{digits[-scale:]}')) + else: + digits = str(-x).rjust(prec, '0') + app(dec(f'-{digits[:-scale]}.{digits[-scale:]}')) + return new_col + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, _ctx): + with decimal.localcontext() as ctx: + ctx.prec = self.prec + dec = decimal.Decimal + mult = self._mult + if self.nullable: + write_array(self._array_type, [int(dec(x) * mult) if x else 0 for x in column], dest) + else: + write_array(self._array_type, [int(dec(x) * mult) for x in column], dest) + + def _active_null(self, ctx: QueryContext): + if ctx.use_none: + return None + digits = str('0').rjust(self.prec, '0') + scale = self.scale + return decimal.Decimal(f'{digits[:-scale]}.{digits[-scale:]}') + + +class BigDecimal(Decimal, registered=False): + def _read_column_binary(self, source: ByteSource, num_rows: int, _ctx): + dec = decimal.Decimal + scale = self.scale + prec = self.prec + column = [] + app = column.append + sz = self.byte_size + ifb = int.from_bytes + if scale == 0: + for _ in range(num_rows): + app(dec(str(ifb(source.read_bytes(sz), 'little', signed=True)))) + return column + for _ in range(num_rows): + x = ifb(source.read_bytes(sz), 'little', signed=True) + if x >= 0: + digits = str(x).rjust(prec, '0') + app(dec(f'{digits[:-scale]}.{digits[-scale:]}')) + else: + digits = str(-x).rjust(prec, '0') + app(dec(f'-{digits[:-scale]}.{digits[-scale:]}')) + return column + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, _ctx): + with decimal.localcontext() as ctx: + ctx.prec = self.prec + mult = decimal.Decimal(f"{self._mult}.{'0' * self.scale}") + sz = self.byte_size + itb = int.to_bytes + if self.nullable: + v = self._zeros + for x in column: + dest += v if not x else itb(int(decimal.Decimal(x) * mult), sz, 'little', signed=True) + else: + for x in column: + dest += itb(int(decimal.Decimal(x) * mult), sz, 'little', signed=True) + + +class Decimal32(Decimal): + dec_size = 32 + + +class Decimal64(Decimal): + dec_size = 64 + + +class Decimal128(BigDecimal): + dec_size = 128 + + +class Decimal256(BigDecimal): + dec_size = 256 diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/registry.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/registry.py new file mode 100644 index 0000000000..47da6e05af --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/registry.py @@ -0,0 +1,62 @@ +import logging + +from typing import Tuple, Dict +from clickhouse_connect.datatypes.base import TypeDef, ClickHouseType, type_map +from clickhouse_connect.driver.exceptions import InternalError +from clickhouse_connect.driver.parser import parse_enum, parse_callable, parse_columns + +logger = logging.getLogger(__name__) +type_cache: Dict[str, ClickHouseType] = {} + + +def parse_name(name: str) -> Tuple[str, str, TypeDef]: + """ + Converts a ClickHouse type name into the base class and the definition (TypeDef) needed for any + additional instantiation + :param name: ClickHouse type name as returned by clickhouse + :return: The original base name (before arguments), the full name as passed in and the TypeDef object that + captures any additional arguments + """ + base = name + wrappers = [] + keys = tuple() + if base.startswith('LowCardinality'): + wrappers.append('LowCardinality') + base = base[15:-1] + if base.startswith('Nullable'): + wrappers.append('Nullable') + base = base[9:-1] + if base.startswith('Enum'): + keys, values = parse_enum(base) + base = base[:base.find('(')] + elif base.startswith('Nested'): + keys, values = parse_columns(base[6:]) + base = 'Nested' + elif base.startswith('Tuple'): + keys, values = parse_columns(base[5:]) + base = 'Tuple' + else: + try: + base, values, _ = parse_callable(base) + except IndexError: + raise InternalError(f'Can not parse ClickHouse data type: {name}') from None + return base, name, TypeDef(tuple(wrappers), keys, values) + + +def get_from_name(name: str) -> ClickHouseType: + """ + Returns the ClickHouseType instance parsed from the ClickHouse type name. Instances are cached + :param name: ClickHouse type name as returned by ClickHouse in WithNamesAndTypes FORMAT or the Native protocol + :return: The instance of the ClickHouse Type + """ + ch_type = type_cache.get(name, None) + if not ch_type: + base, name, type_def = parse_name(name) + try: + ch_type = type_map[base].build(type_def) + except KeyError: + err_str = f'Unrecognized ClickHouse type base: {base} name: {name}' + logger.error(err_str) + raise InternalError(err_str) from None + type_cache[name] = ch_type + return ch_type diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py new file mode 100644 index 0000000000..d45abd47ee --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/special.py @@ -0,0 +1,109 @@ +from typing import Union, Sequence, MutableSequence +from uuid import UUID as PYUUID + +from clickhouse_connect.datatypes.base import TypeDef, ClickHouseType, ArrayType, UnsupportedType +from clickhouse_connect.datatypes.registry import get_from_name +from clickhouse_connect.driver.ctypes import data_conv +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.query import QueryContext +from clickhouse_connect.driver.types import ByteSource + +empty_uuid_b = bytes(b'\x00' * 16) + + +class UUID(ClickHouseType): + valid_formats = 'string', 'native' + np_type = 'U36' + byte_size = 16 + + def python_null(self, ctx): + return '' if self.read_format(ctx) == 'string' else PYUUID(int=0) + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + if self.read_format(ctx) == 'string': + return self._read_binary_str(source, num_rows) + return data_conv.read_uuid_col(source, num_rows) + + @staticmethod + def _read_binary_str(source: ByteSource, num_rows: int): + v = source.read_array('Q', num_rows * 2) + column = [] + app = column.append + for i in range(num_rows): + ix = i << 1 + x = f'{(v[ix] << 64 | v[ix + 1]):032x}' + app(f'{x[:8]}-{x[8:12]}-{x[12:16]}-{x[16:20]}-{x[20:]}') + return column + + # pylint: disable=too-many-branches + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + first = self._first_value(column) + empty = empty_uuid_b + if isinstance(first, str) or self.write_format(ctx) == 'string': + for v in column: + if v: + x = int(v, 16) + dest += (x >> 64).to_bytes(8, 'little') + (x & 0xffffffffffffffff).to_bytes(8, 'little') + else: + dest += empty + elif isinstance(first, int): + for x in column: + if x: + dest += (x >> 64).to_bytes(8, 'little') + (x & 0xffffffffffffffff).to_bytes(8, 'little') + else: + dest += empty + elif isinstance(first, PYUUID): + for v in column: + if v: + x = v.int + dest += (x >> 64).to_bytes(8, 'little') + (x & 0xffffffffffffffff).to_bytes(8, 'little') + else: + dest += empty + elif isinstance(first, (bytes, bytearray, memoryview)): + for v in column: + if v: + dest += bytes(reversed(v[:8])) + bytes(reversed(v[8:])) + else: + dest += empty + else: + dest += empty * len(column) + + +class Nothing(ArrayType): + _array_type = 'b' + + def __init__(self, type_def: TypeDef): + super().__init__(type_def) + self.nullable = True + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, _ctx): + dest += bytes(0x30 for _ in range(len(column))) + + +class SimpleAggregateFunction(ClickHouseType): + _slots = ('element_type',) + + def __init__(self, type_def: TypeDef): + super().__init__(type_def) + self.element_type: ClickHouseType = get_from_name(type_def.values[1]) + self._name_suffix = type_def.arg_str + self.byte_size = self.element_type.byte_size + + def _data_size(self, sample: Sequence) -> int: + return self.element_type.data_size(sample) + + def read_column_prefix(self, source: ByteSource): + return self.element_type.read_column_prefix(source) + + def write_column_prefix(self, dest: bytearray): + self.element_type.write_column_prefix(dest) + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + return self.element_type.read_column_data(source, num_rows, ctx) + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + self.element_type.write_column_data(column, dest, ctx) + + +class AggregateFunction(UnsupportedType): + pass diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py new file mode 100644 index 0000000000..7f6b7a5a77 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/string.py @@ -0,0 +1,127 @@ +from typing import Sequence, MutableSequence, Union, Collection + +from clickhouse_connect.driver.ctypes import data_conv + +from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef +from clickhouse_connect.driver.exceptions import DataError +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.query import QueryContext +from clickhouse_connect.driver.types import ByteSource +from clickhouse_connect.driver.options import np, pd + + +class String(ClickHouseType): + valid_formats = 'bytes', 'native' + + def _active_encoding(self, ctx): + if self.read_format(ctx) == 'bytes': + return None + if ctx.encoding: + return ctx.encoding + return self.encoding + + def _data_size(self, sample: Collection) -> int: + if len(sample) == 0: + return 0 + total = 0 + for x in sample: + if x: + total += len(x) + return total // len(sample) + 1 + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + return source.read_str_col(num_rows, self._active_encoding(ctx)) + + def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence: + return source.read_str_col(num_rows, self._active_encoding(ctx), True, self._active_null(ctx)) + + def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence: + if ctx.use_extended_dtypes and self.read_format(ctx) == 'native': + return pd.array(column, dtype=pd.StringDtype()) + if ctx.use_numpy and ctx.max_str_len: + return np.array(column, dtype=f'<U{ctx.max_str_len}') + return column + + # pylint: disable=duplicate-code,too-many-nested-blocks,too-many-branches + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + encoding = None + if isinstance(self._first_value(column), str): + encoding = ctx.encoding or self.encoding + data_conv.write_str_col(column, encoding, dest) + + def _active_null(self, ctx): + if ctx.use_none: + return None + if self.read_format(ctx) == 'bytes': + return bytes() + return '' + + +class FixedString(ClickHouseType): + valid_formats = 'string', 'native' + + def __init__(self, type_def: TypeDef): + super().__init__(type_def) + self.byte_size = type_def.values[0] + self._name_suffix = type_def.arg_str + self._empty_bytes = bytes(b'\x00' * self.byte_size) + + def _active_null(self, ctx: QueryContext): + if ctx.use_none: + return None + return self._empty_bytes if self.read_format(ctx) == 'native' else '' + + @property + def np_type(self): + return f'<U{self.byte_size}' + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + if self.read_format(ctx) == 'string': + return source.read_fixed_str_col(self.byte_size, num_rows, ctx.encoding or self.encoding ) + return source.read_bytes_col(self.byte_size, num_rows) + + # pylint: disable=too-many-branches,duplicate-code + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + ext = dest.extend + sz = self.byte_size + empty = bytes((0,) * sz) + str_enc = str.encode + enc = ctx.encoding or self.encoding + first = self._first_value(column) + if isinstance(first, str) or self.write_format(ctx) == 'string': + if self.nullable: + for x in column: + if x is None: + ext(empty) + else: + try: + b = str_enc(x, enc) + except UnicodeEncodeError: + b = empty + if len(b) > sz: + raise DataError(f'UTF-8 encoded FixedString value {b.hex(" ")} exceeds column size {sz}') + ext(b) + ext(empty[:sz - len(b)]) + else: + for x in column: + try: + b = str_enc(x, enc) + except UnicodeEncodeError: + b = empty + if len(b) > sz: + raise DataError(f'UTF-8 encoded FixedString value {b.hex(" ")} exceeds column size {sz}') + ext(b) + ext(empty[:sz - len(b)]) + elif self.nullable: + for b in column: + if not b: + ext(empty) + elif len(b) != sz: + raise DataError(f'Fixed String binary value {b.hex(" ")} does not match column size {sz}') + else: + ext(b) + else: + for b in column: + if len(b) != sz: + raise DataError(f'Fixed String binary value {b.hex(" ")} does not match column size {sz}') + ext(b) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py new file mode 100644 index 0000000000..da672823b6 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/datatypes/temporal.py @@ -0,0 +1,216 @@ +import pytz + +from datetime import date, datetime, tzinfo +from typing import Union, Sequence, MutableSequence + +from clickhouse_connect.datatypes.base import TypeDef, ClickHouseType +from clickhouse_connect.driver.common import write_array, np_date_types, int_size +from clickhouse_connect.driver.exceptions import ProgrammingError +from clickhouse_connect.driver.ctypes import data_conv, numpy_conv +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.query import QueryContext +from clickhouse_connect.driver.types import ByteSource +from clickhouse_connect.driver.options import np, pd + +epoch_start_date = date(1970, 1, 1) +epoch_start_datetime = datetime(1970, 1, 1) + + +class Date(ClickHouseType): + _array_type = 'H' + np_type = 'datetime64[D]' + nano_divisor = 86400 * 1000000000 + valid_formats = 'native', 'int' + python_type = date + byte_size = 2 + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + if self.read_format(ctx) == 'int': + return source.read_array(self._array_type, num_rows) + if ctx.use_numpy: + return numpy_conv.read_numpy_array(source, '<u2', num_rows).astype(self.np_type) + return data_conv.read_date_col(source, num_rows) + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + first = self._first_value(column) + if isinstance(first, int) or self.write_format(ctx) == 'int': + if self.nullable: + column = [x if x else 0 for x in column] + else: + if isinstance(first, datetime): + esd = epoch_start_datetime + else: + esd = epoch_start_date + if self.nullable: + column = [0 if x is None else (x - esd).days for x in column] + else: + column = [(x - esd).days for x in column] + write_array(self._array_type, column, dest) + + def _active_null(self, ctx: QueryContext): + fmt = self.read_format(ctx) + if ctx.use_extended_dtypes: + return pd.NA if fmt == 'int' else pd.NaT + if ctx.use_none: + return None + if fmt == 'int': + return 0 + if ctx.use_numpy: + return np.datetime64(0) + return epoch_start_date + + def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence: + if self.read_format(ctx) == 'int': + return column + if ctx.use_numpy and self.nullable and not ctx.use_none: + return np.array(column, dtype=self.np_type) + return column + + +class Date32(Date): + byte_size = 4 + _array_type = 'l' if int_size == 2 else 'i' + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + if ctx.use_numpy: + return numpy_conv.read_numpy_array(source, '<i4', num_rows).astype(self.np_type) + if self.read_format(ctx) == 'int': + return source.read_array(self._array_type, num_rows) + return data_conv.read_date32_col(source, num_rows) + + +from_ts_naive = datetime.utcfromtimestamp +from_ts_tz = datetime.fromtimestamp + + +class DateTimeBase(ClickHouseType, registered=False): + __slots__ = ('tzinfo',) + valid_formats = 'native', 'int' + python_type = datetime + + def _active_null(self, ctx: QueryContext): + fmt = self.read_format(ctx) + if ctx.use_extended_dtypes: + return pd.NA if fmt == 'int' else pd.NaT + if ctx.use_none: + return None + if self.read_format(ctx) == 'int': + return 0 + if ctx.use_numpy: + return np.datetime64(0) + return epoch_start_datetime + + +class DateTime(DateTimeBase): + _array_type = 'L' if int_size == 2 else 'I' + np_type = 'datetime64[s]' + nano_divisor = 1000000000 + byte_size = 4 + + def __init__(self, type_def: TypeDef): + super().__init__(type_def) + self._name_suffix = type_def.arg_str + if len(type_def.values) > 0: + self.tzinfo = pytz.timezone(type_def.values[0][1:-1]) + else: + self.tzinfo = None + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + if self.read_format(ctx) == 'int': + return source.read_array(self._array_type, num_rows) + active_tz = ctx.active_tz(self.tzinfo) + if ctx.use_numpy: + np_array = numpy_conv.read_numpy_array(source, '<u4', num_rows).astype(self.np_type) + if ctx.as_pandas and active_tz: + return pd.DatetimeIndex(np_array, tz='UTC').tz_convert(active_tz) + return np_array + return data_conv.read_datetime_col(source, num_rows, active_tz) + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + first = self._first_value(column) + if isinstance(first, int) or self.write_format(ctx) == 'int': + if self.nullable: + column = [x if x else 0 for x in column] + else: + if self.nullable: + column = [int(x.timestamp()) if x else 0 for x in column] + else: + column = [int(x.timestamp()) for x in column] + write_array(self._array_type, column, dest) + + +class DateTime64(DateTimeBase): + __slots__ = 'scale', 'prec', 'unit' + byte_size = 8 + + def __init__(self, type_def: TypeDef): + super().__init__(type_def) + self._name_suffix = type_def.arg_str + self.scale = type_def.values[0] + self.prec = 10 ** self.scale + self.unit = np_date_types.get(self.scale) + if len(type_def.values) > 1: + self.tzinfo = pytz.timezone(type_def.values[1][1:-1]) + else: + self.tzinfo = None + + @property + def np_type(self): + if self.unit: + return f'datetime64{self.unit}' + raise ProgrammingError(f'Cannot use {self.name} as a numpy or Pandas datatype. Only milliseconds(3), ' + + 'microseconds(6), or nanoseconds(9) are supported for numpy based queries.') + + @property + def nano_divisor(self): + return 1000000000 // self.prec + + def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext): + if self.read_format(ctx) == 'int': + return source.read_array('q', num_rows) + active_tz = ctx.active_tz(self.tzinfo) + if ctx.use_numpy: + np_array = numpy_conv.read_numpy_array(source, self.np_type, num_rows) + if ctx.as_pandas and active_tz and active_tz != pytz.UTC: + return pd.DatetimeIndex(np_array, tz='UTC').tz_convert(active_tz) + return np_array + column = source.read_array('q', num_rows) + if active_tz and active_tz != pytz.UTC: + return self._read_binary_tz(column, active_tz) + return self._read_binary_naive(column) + + def _read_binary_tz(self, column: Sequence, tz_info: tzinfo): + new_col = [] + app = new_col.append + dt_from = datetime.fromtimestamp + prec = self.prec + for ticks in column: + seconds = ticks // prec + dt_sec = dt_from(seconds, tz_info) + app(dt_sec.replace(microsecond=((ticks - seconds * prec) * 1000000) // prec)) + return new_col + + def _read_binary_naive(self, column: Sequence): + new_col = [] + app = new_col.append + dt_from = datetime.utcfromtimestamp + prec = self.prec + for ticks in column: + seconds = ticks // prec + dt_sec = dt_from(seconds) + app(dt_sec.replace(microsecond=((ticks - seconds * prec) * 1000000) // prec)) + return new_col + + def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): + first = self._first_value(column) + if isinstance(first, int) or self.write_format(ctx) == 'int': + if self.nullable: + column = [x if x else 0 for x in column] + else: + prec = self.prec + if self.nullable: + column = [((int(x.timestamp()) * 1000000 + x.microsecond) * prec) // 1000000 if x else 0 + for x in column] + else: + column = [((int(x.timestamp()) * 1000000 + x.microsecond) * prec) // 1000000 for x in column] + write_array('q', column, dest) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/__init__.py new file mode 100644 index 0000000000..ea792b4968 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/__init__.py @@ -0,0 +1,28 @@ +from typing import Optional + +from clickhouse_connect.dbapi.connection import Connection + + +apilevel = '2.0' # PEP 249 DB API level +threadsafety = 2 # PEP 249 Threads may share the module and connections. +paramstyle = 'pyformat' # PEP 249 Python extended format codes, e.g. ...WHERE name=%(name)s + + +class Error(Exception): + pass + + +def connect(host: Optional[str] = None, + database: Optional[str] = None, + username: Optional[str] = '', + password: Optional[str] = '', + port: Optional[int] = None, + **kwargs): + secure = kwargs.pop('secure', False) + return Connection(host=host, + database=database, + username=username, + password=password, + port=port, + secure=secure, + **kwargs) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/connection.py b/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/connection.py new file mode 100644 index 0000000000..d1b3cb7f3c --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/connection.py @@ -0,0 +1,50 @@ +from typing import Union + +from clickhouse_connect.dbapi.cursor import Cursor +from clickhouse_connect.driver import create_client +from clickhouse_connect.driver.query import QueryResult + + +class Connection: + """ + See :ref:`https://peps.python.org/pep-0249/` + """ + # pylint: disable=too-many-arguments + def __init__(self, + dsn: str = None, + username: str = '', + password: str = '', + host: str = None, + database: str = None, + interface: str = None, + port: int = 0, + secure: Union[bool, str] = False, + **kwargs): + self.client = create_client(host=host, + username=username, + password=password, + database=database, + interface=interface, + port=port, + secure=secure, + dsn=dsn, + generic_args=kwargs) + self.timezone = self.client.server_tz + + def close(self): + self.client.close() + + def commit(self): + pass + + def rollback(self): + pass + + def command(self, cmd: str): + return self.client.command(cmd) + + def raw_query(self, query: str) -> QueryResult: + return self.client.query(query) + + def cursor(self): + return Cursor(self.client) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/cursor.py b/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/cursor.py new file mode 100644 index 0000000000..b8f23452ac --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/dbapi/cursor.py @@ -0,0 +1,126 @@ +import logging +import re + +from typing import Optional, Sequence + +from clickhouse_connect.datatypes.registry import get_from_name +from clickhouse_connect.driver.common import unescape_identifier +from clickhouse_connect.driver.exceptions import ProgrammingError +from clickhouse_connect.driver import Client +from clickhouse_connect.driver.parser import parse_callable +from clickhouse_connect.driver.query import remove_sql_comments + +logger = logging.getLogger(__name__) + +insert_re = re.compile(r'^\s*INSERT\s+INTO\s+(.*$)', re.IGNORECASE) +str_type = get_from_name('String') +int_type = get_from_name('Int32') + + +class Cursor: + """ + See :ref:`https://peps.python.org/pep-0249/` + """ + + def __init__(self, client: Client): + self.client = client + self.arraysize = 1 + self.data: Optional[Sequence] = None + self.names = [] + self.types = [] + self._rowcount = 0 + self._ix = 0 + + def check_valid(self): + if self.data is None: + raise ProgrammingError('Cursor is not valid') + + @property + def description(self): + return [(n, t, None, None, None, None, True) for n, t in zip(self.names, self.types)] + + @property + def rowcount(self): + return self._rowcount + + def close(self): + self.data = None + + def execute(self, operation: str, parameters=None): + query_result = self.client.query(operation, parameters) + self.data = query_result.result_set + self._rowcount = len(self.data) + if query_result.column_names: + self.names = query_result.column_names + self.types = [x.name for x in query_result.column_types] + elif self.data: + self.names = [f'col_{x}' for x in range(len(self.data[0]))] + self.types = [x.__class__ for x in self.data[0]] + + def _try_bulk_insert(self, operation: str, data): + match = insert_re.match(remove_sql_comments(operation)) + if not match: + return False + temp = match.group(1) + table_end = min(temp.find(' '), temp.find('(')) + table = temp[:table_end].strip() + temp = temp[table_end:].strip() + if temp[0] == '(': + _, op_columns, temp = parse_callable(temp) + else: + op_columns = None + if 'VALUES' not in temp.upper(): + return False + col_names = list(data[0].keys()) + if op_columns and {unescape_identifier(x) for x in op_columns} != set(col_names): + return False # Data sent in doesn't match the columns in the insert statement + data_values = [list(row.values()) for row in data] + self.client.insert(table, data_values, col_names) + self.data = [] + return True + + def executemany(self, operation, parameters): + if not parameters or self._try_bulk_insert(operation, parameters): + return + self.data = [] + try: + for param_row in parameters: + query_result = self.client.query(operation, param_row) + self.data.extend(query_result.result_set) + if self.names or self.types: + if query_result.column_names != self.names: + logger.warning('Inconsistent column names %s : %s for operation %s in cursor executemany', + self.names, query_result.column_names, operation) + else: + self.names = query_result.column_names + self.types = query_result.column_types + except TypeError as ex: + raise ProgrammingError(f'Invalid parameters {parameters} passed to cursor executemany') from ex + self._rowcount = len(self.data) + + def fetchall(self): + self.check_valid() + ret = self.data + self._ix = self._rowcount + return ret + + def fetchone(self): + self.check_valid() + if self._ix >= self._rowcount: + return None + val = self.data[self._ix] + self._ix += 1 + return val + + def fetchmany(self, size: int = -1): + self.check_valid() + end = self._ix + max(size, self._rowcount - self._ix) + ret = self.data[self._ix: end] + self._ix = end + return ret + + def nextset(self): + raise NotImplementedError + + def callproc(self, *args, **kwargs): + raise NotImplementedError diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py new file mode 100644 index 0000000000..1320c04213 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py @@ -0,0 +1,118 @@ +from inspect import signature +from typing import Optional, Union, Dict, Any +from urllib.parse import urlparse, parse_qs + +import clickhouse_connect.driver.ctypes +from clickhouse_connect.driver.client import Client +from clickhouse_connect.driver.common import dict_copy +from clickhouse_connect.driver.exceptions import ProgrammingError +from clickhouse_connect.driver.httpclient import HttpClient + + +# pylint: disable=too-many-arguments,too-many-locals,too-many-branches +def create_client(*, + host: str = None, + username: str = None, + password: str = '', + database: str = '__default__', + interface: Optional[str] = None, + port: int = 0, + secure: Union[bool, str] = False, + dsn: Optional[str] = None, + settings: Optional[Dict[str, Any]] = None, + generic_args: Optional[Dict[str, Any]] = None, + **kwargs) -> Client: + """ + The preferred method to get a ClickHouse Connect Client instance + + :param host: The hostname or IP address of the ClickHouse server. If not set, localhost will be used. + :param username: The ClickHouse username. If not set, the default ClickHouse user will be used. + :param password: The password for username. + :param database: The default database for the connection. If not set, ClickHouse Connect will use the + default database for username. + :param interface: Must be http or https. Defaults to http, or to https if port is set to 8443 or 443 + :param port: The ClickHouse HTTP or HTTPS port. If not set will default to 8123, or to 8443 if secure=True + or interface=https. + :param secure: Use https/TLS. This overrides inferred values from the interface or port arguments. + :param dsn: A string in standard DSN (Data Source Name) format. Other connection values (such as host or user) + will be extracted from this string if not set otherwise. + :param settings: ClickHouse server settings to be used with the session/every request + :param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings. + It is not recommended to use this parameter externally. + + :param kwargs -- Recognized keyword arguments (used by the HTTP client), see below + + :param compress: Enable compression for ClickHouse HTTP inserts and query results. True will select the preferred + compression method (lz4). A str of 'lz4', 'zstd', 'brotli', or 'gzip' can be used to use a specific compression type + :param query_limit: Default LIMIT on returned rows. 0 means no limit + :param connect_timeout: Timeout in seconds for the http connection + :param send_receive_timeout: Read timeout in seconds for http connection + :param client_name: client_name prepended to the HTTP User Agent header. Set this to track client queries + in the ClickHouse system.query_log. + :param send_progress: Deprecated, has no effect. Previous functionality is now automatically determined + :param verify: Verify the server certificate in secure/https mode + :param ca_cert: If verify is True, the file path to Certificate Authority root to validate ClickHouse server + certificate, in .pem format. Ignored if verify is False. This is not necessary if the ClickHouse server + certificate is trusted by the operating system. To trust the maintained list of "global" public root + certificates maintained by the Python 'certifi' package, set ca_cert to 'certifi' + :param client_cert: File path to a TLS Client certificate in .pem format. This file should contain any + applicable intermediate certificates + :param client_cert_key: File path to the private key for the Client Certificate. Required if the private key + is not included the Client Certificate key file + :param session_id ClickHouse session id. If not specified and the common setting 'autogenerate_session_id' + is True, the client will generate a UUID1 session id + :param pool_mgr Optional urllib3 PoolManager for this client. Useful for creating separate connection + pools for multiple client endpoints for applications with many clients + :param http_proxy http proxy address. Equivalent to setting the HTTP_PROXY environment variable + :param https_proxy https proxy address. Equivalent to setting the HTTPS_PROXY environment variable + :param server_host_name This is the server host name that will be checked against a TLS certificate for + validity. This option can be used if using an ssh_tunnel or other indirect means to an ClickHouse server + where the `host` argument refers to the tunnel or proxy and not the actual ClickHouse server + :return: ClickHouse Connect Client instance + """ + if dsn: + parsed = urlparse(dsn) + username = username or parsed.username + password = password or parsed.password + host = host or parsed.hostname + port = port or parsed.port + if parsed.path and (not database or database == '__default__'): + database = parsed.path[1:].split('/')[0] + database = database or parsed.path + kwargs.update(dict(parse_qs(parsed.query))) + use_tls = str(secure).lower() == 'true' or interface == 'https' or (not interface and port in (443, 8443)) + if not host: + host = 'localhost' + if not interface: + interface = 'https' if use_tls else 'http' + port = port or default_port(interface, use_tls) + if username is None and 'user' in kwargs: + username = kwargs.pop('user') + if username is None and 'user_name' in kwargs: + username = kwargs.pop('user_name') + if password and username is None: + username = 'default' + if 'compression' in kwargs and 'compress' not in kwargs: + kwargs['compress'] = kwargs.pop('compression') + settings = settings or {} + if interface.startswith('http'): + if generic_args: + client_params = signature(HttpClient).parameters + for name, value in generic_args.items(): + if name in client_params: + kwargs[name] = value + elif name == 'compression': + if 'compress' not in kwargs: + kwargs['compress'] = value + else: + if name.startswith('ch_'): + name = name[3:] + settings[name] = value + return HttpClient(interface, host, port, username, password, database, settings=settings, **kwargs) + raise ProgrammingError(f'Unrecognized client type {interface}') + + +def default_port(interface: str, secure: bool): + if interface.startswith('http'): + return 8443 if secure else 8123 + raise ValueError('Unrecognized ClickHouse interface') diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/buffer.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/buffer.py new file mode 100644 index 0000000000..b50b9bb678 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/buffer.py @@ -0,0 +1,140 @@ +import sys +import array +from typing import Any, Iterable + +from clickhouse_connect.driver.exceptions import StreamCompleteException +from clickhouse_connect.driver.types import ByteSource + +must_swap = sys.byteorder == 'big' + + +class ResponseBuffer(ByteSource): + slots = 'slice_sz', 'buf_loc', 'end', 'gen', 'buffer', 'slice' + + def __init__(self, source): + self.slice_sz = 4096 + self.buf_loc = 0 + self.buf_sz = 0 + self.source = source + self.gen = source.gen + self.buffer = bytes() + + def read_bytes(self, sz: int): + if self.buf_loc + sz <= self.buf_sz: + self.buf_loc += sz + return self.buffer[self.buf_loc - sz: self.buf_loc] + # Create a temporary buffer that bridges two or more source chunks + bridge = bytearray(self.buffer[self.buf_loc: self.buf_sz]) + self.buf_loc = 0 + self.buf_sz = 0 + while len(bridge) < sz: + chunk = next(self.gen, None) + if not chunk: + raise StreamCompleteException + x = len(chunk) + if len(bridge) + x <= sz: + bridge.extend(chunk) + else: + tail = sz - len(bridge) + bridge.extend(chunk[:tail]) + self.buffer = chunk + self.buf_sz = x + self.buf_loc = tail + return bridge + + def read_byte(self) -> int: + if self.buf_loc < self.buf_sz: + self.buf_loc += 1 + return self.buffer[self.buf_loc - 1] + self.buf_sz = 0 + self.buf_loc = 0 + chunk = next(self.gen, None) + if not chunk: + raise StreamCompleteException + x = len(chunk) + if x > 1: + self.buffer = chunk + self.buf_loc = 1 + self.buf_sz = x + return chunk[0] + + def read_leb128(self) -> int: + sz = 0 + shift = 0 + while True: + b = self.read_byte() + sz += ((b & 0x7f) << shift) + if (b & 0x80) == 0: + return sz + shift += 7 + + def read_leb128_str(self) -> str: + sz = self.read_leb128() + return self.read_bytes(sz).decode() + + def read_uint64(self) -> int: + return int.from_bytes(self.read_bytes(8), 'little', signed=False) + + def read_str_col(self, + num_rows: int, + encoding: str, + nullable: bool = False, + null_obj: Any = None) -> Iterable[str]: + column = [] + app = column.append + null_map = self.read_bytes(num_rows) if nullable else None + for ix in range(num_rows): + sz = 0 + shift = 0 + while True: + b = self.read_byte() + sz += ((b & 0x7f) << shift) + if (b & 0x80) == 0: + break + shift += 7 + x = self.read_bytes(sz) + if null_map and null_map[ix]: + app(null_obj) + elif encoding: + try: + app(x.decode(encoding)) + except UnicodeDecodeError: + app(x.hex()) + else: + app(x) + return column + + def read_bytes_col(self, sz: int, num_rows: int) -> Iterable[bytes]: + source = self.read_bytes(sz * num_rows) + return [bytes(source[x:x+sz]) for x in range(0, sz * num_rows, sz)] + + def read_fixed_str_col(self, sz: int, num_rows: int, encoding: str) -> Iterable[str]: + source = self.read_bytes(sz * num_rows) + column = [] + app = column.append + for ix in range(0, sz * num_rows, sz): + try: + app(str(source[ix: ix + sz], encoding).rstrip('\x00')) + except UnicodeDecodeError: + app(source[ix: ix + sz].hex()) + return column + + def read_array(self, array_type: str, num_rows: int) -> Iterable[Any]: + column = array.array(array_type) + sz = column.itemsize * num_rows + b = self.read_bytes(sz) + column.frombytes(b) + if must_swap: + column.byteswap() + return column + + @property + def last_message(self): + if len(self.buffer) == 0: + return None + return self.buffer.decode() + + def close(self): + if self.source: + self.source.close() + self.source = None diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py new file mode 100644 index 0000000000..beaea8e0d1 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py @@ -0,0 +1,727 @@ +import io +import logging +from datetime import tzinfo, datetime + +import pytz + +from abc import ABC, abstractmethod +from typing import Iterable, Optional, Any, Union, Sequence, Dict, Generator, BinaryIO +from pytz.exceptions import UnknownTimeZoneError + +from clickhouse_connect import common +from clickhouse_connect.common import version +from clickhouse_connect.datatypes.registry import get_from_name +from clickhouse_connect.datatypes.base import ClickHouseType +from clickhouse_connect.driver.common import dict_copy, StreamContext, coerce_int, coerce_bool +from clickhouse_connect.driver.constants import CH_VERSION_WITH_PROTOCOL, PROTOCOL_VERSION_WITH_LOW_CARD +from clickhouse_connect.driver.exceptions import ProgrammingError, OperationalError +from clickhouse_connect.driver.external import ExternalData +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.summary import QuerySummary +from clickhouse_connect.driver.models import ColumnDef, SettingDef, SettingStatus +from clickhouse_connect.driver.query import QueryResult, to_arrow, QueryContext, arrow_buffer + +io.DEFAULT_BUFFER_SIZE = 1024 * 256 +logger = logging.getLogger(__name__) +arrow_str_setting = 'output_format_arrow_string_as_string' + + +# pylint: disable=too-many-public-methods, too-many-instance-attributes +class Client(ABC): + """ + Base ClickHouse Connect client + """ + compression: str = None + write_compression: str = None + protocol_version = 0 + valid_transport_settings = set() + optional_transport_settings = set() + database = None + max_error_message = 0 + + def __init__(self, + database: str, + query_limit: int, + uri: str, + query_retries: int, + server_host_name: Optional[str], + apply_server_timezone: Optional[Union[str, bool]]): + """ + Shared initialization of ClickHouse Connect client + :param database: database name + :param query_limit: default LIMIT for queries + :param uri: uri for error messages + """ + self.query_limit = coerce_int(query_limit) + self.query_retries = coerce_int(query_retries) + self.server_host_name = server_host_name + self.server_tz = pytz.UTC + self.server_version, server_tz = \ + tuple(self.command('SELECT version(), timezone()', use_database=False)) + try: + self.server_tz = pytz.timezone(server_tz) + except UnknownTimeZoneError: + logger.warning('Warning, server is using an unrecognized timezone %s, will use UTC default', server_tz) + offsets_differ = datetime.now().astimezone().utcoffset() != datetime.now(tz=self.server_tz).utcoffset() + self.apply_server_timezone = apply_server_timezone == 'always' or ( + coerce_bool(apply_server_timezone) and offsets_differ) + readonly = 'readonly' + if not self.min_version('19.17'): + readonly = common.get_setting('readonly') + server_settings = self.query(f'SELECT name, value, {readonly} as readonly FROM system.settings LIMIT 10000') + self.server_settings = {row['name']: SettingDef(**row) for row in server_settings.named_results()} + if database and not database == '__default__': + self.database = database + if self.min_version(CH_VERSION_WITH_PROTOCOL): + # Unfortunately we have to validate that the client protocol version is actually used by ClickHouse + # since the query parameter could be stripped off (in particular, by CHProxy) + test_data = self.raw_query('SELECT 1 AS check', fmt='Native', settings={ + 'client_protocol_version': PROTOCOL_VERSION_WITH_LOW_CARD + }) + if test_data[8:16] == b'\x01\x01\x05check': + self.protocol_version = PROTOCOL_VERSION_WITH_LOW_CARD + self.uri = uri + + def _validate_settings(self, settings: Optional[Dict[str, Any]]) -> Dict[str, str]: + """ + This strips any ClickHouse settings that are not recognized or are read only. + :param settings: Dictionary of setting name and values + :return: A filtered dictionary of settings with values rendered as strings + """ + validated = {} + invalid_action = common.get_setting('invalid_setting_action') + for key, value in settings.items(): + str_value = self._validate_setting(key, value, invalid_action) + if str_value is not None: + validated[key] = value + return validated + + def _validate_setting(self, key: str, value: Any, invalid_action: str) -> Optional[str]: + if key not in self.valid_transport_settings: + setting_def = self.server_settings.get(key) + if setting_def is None or setting_def.readonly: + if key in self.optional_transport_settings: + return None + if invalid_action == 'send': + logger.warning('Attempting to send unrecognized or readonly setting %s', key) + elif invalid_action == 'drop': + logger.warning('Dropping unrecognized or readonly settings %s', key) + return None + else: + raise ProgrammingError(f'Setting {key} is unknown or readonly') from None + if isinstance(value, bool): + return '1' if value else '0' + return str(value) + + def _setting_status(self, key: str) -> SettingStatus: + comp_setting = self.server_settings.get(key) + if not comp_setting: + return SettingStatus(False, False) + return SettingStatus(comp_setting.value != '0', comp_setting.readonly != 1) + + def _prep_query(self, context: QueryContext): + if context.is_select and not context.has_limit and self.query_limit: + return f'{context.final_query}\n LIMIT {self.query_limit}' + return context.final_query + + def _check_tz_change(self, new_tz) -> Optional[tzinfo]: + if new_tz: + try: + new_tzinfo = pytz.timezone(new_tz) + if new_tzinfo != self.server_tz: + return new_tzinfo + except UnknownTimeZoneError: + logger.warning('Unrecognized timezone %s received from ClickHouse', new_tz) + return None + + @abstractmethod + def _query_with_context(self, context: QueryContext): + pass + + @abstractmethod + def set_client_setting(self, key, value): + """ + Set a clickhouse setting for the client after initialization. If a setting is not recognized by ClickHouse, + or the setting is identified as "read_only", this call will either throw a Programming exception or attempt + to send the setting anyway based on the common setting 'invalid_setting_action' + :param key: ClickHouse setting name + :param value: ClickHouse setting value + """ + + @abstractmethod + def get_client_setting(self, key) -> Optional[str]: + """ + :param key: The setting key + :return: The string value of the setting, if it exists, or None + """ + + # pylint: disable=too-many-arguments,unused-argument,too-many-locals + def query(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + column_oriented: Optional[bool] = None, + use_numpy: Optional[bool] = None, + max_str_len: Optional[int] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> QueryResult: + """ + Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. For + parameters, see the create_query_context method + :return: QueryResult -- data and metadata from response + """ + if query and query.lower().strip().startswith('select __connect_version__'): + return QueryResult([[f'ClickHouse Connect v.{version()} ⓒ ClickHouse Inc.']], None, + ('connect_version',), (get_from_name('String'),)) + kwargs = locals().copy() + del kwargs['self'] + query_context = self.create_query_context(**kwargs) + if query_context.is_command: + response = self.command(query, + parameters=query_context.parameters, + settings=query_context.settings, + external_data=query_context.external_data) + if isinstance(response, QuerySummary): + return response.as_query_result() + return QueryResult([response] if isinstance(response, list) else [[response]]) + return self._query_with_context(query_context) + + def query_column_block_stream(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Variation of main query method that returns a stream of column oriented blocks. For + parameters, see the create_query_context method. + :return: StreamContext -- Iterable stream context that returns column oriented blocks + """ + return self._context_query(locals(), use_numpy=False, streaming=True).column_block_stream + + def query_row_block_stream(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Variation of main query method that returns a stream of row oriented blocks. For + parameters, see the create_query_context method. + :return: StreamContext -- Iterable stream context that returns blocks of rows + """ + return self._context_query(locals(), use_numpy=False, streaming=True).row_block_stream + + def query_rows_stream(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Variation of main query method that returns a stream of row oriented blocks. For + parameters, see the create_query_context method. + :return: StreamContext -- Iterable stream context that returns blocks of rows + """ + return self._context_query(locals(), use_numpy=False, streaming=True).rows_stream + + @abstractmethod + def raw_query(self, query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + fmt: str = None, + use_database: bool = True, + external_data: Optional[ExternalData] = None) -> bytes: + """ + Query method that simply returns the raw ClickHouse format bytes + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param fmt: ClickHouse output format + :param use_database Send the database parameter to ClickHouse so the command will be executed in the client + database context. + :param external_data External data to send with the query + :return: bytes representing raw ClickHouse return value based on format + """ + + # pylint: disable=duplicate-code,too-many-arguments,unused-argument + def query_np(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None): + """ + Query method that returns the results as a numpy array. For parameter values, see the + create_query_context method + :return: Numpy array representing the result set + """ + return self._context_query(locals(), use_numpy=True).np_result + + # pylint: disable=duplicate-code,too-many-arguments,unused-argument + def query_np_stream(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Query method that returns the results as a stream of numpy arrays. For parameter values, see the + create_query_context method + :return: Generator that yield a numpy array per block representing the result set + """ + return self._context_query(locals(), use_numpy=True, streaming=True).np_stream + + # pylint: disable=duplicate-code,too-many-arguments,unused-argument + def query_df(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + use_na_values: Optional[bool] = None, + query_tz: Optional[str] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None, + use_extended_dtypes: Optional[bool] = None): + """ + Query method that results the results as a pandas dataframe. For parameter values, see the + create_query_context method + :return: Pandas dataframe representing the result set + """ + return self._context_query(locals(), use_numpy=True, as_pandas=True).df_result + + # pylint: disable=duplicate-code,too-many-arguments,unused-argument + def query_df_stream(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + use_na_values: Optional[bool] = None, + query_tz: Optional[str] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None, + use_extended_dtypes: Optional[bool] = None) -> StreamContext: + """ + Query method that returns the results as a StreamContext. For parameter values, see the + create_query_context method + :return: Pandas dataframe representing the result set + """ + return self._context_query(locals(), use_numpy=True, + as_pandas=True, + streaming=True).df_stream + + def create_query_context(self, + query: str = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + column_oriented: Optional[bool] = None, + use_numpy: Optional[bool] = False, + max_str_len: Optional[int] = 0, + context: Optional[QueryContext] = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + use_na_values: Optional[bool] = None, + streaming: bool = False, + as_pandas: bool = False, + external_data: Optional[ExternalData] = None, + use_extended_dtypes: Optional[bool] = None) -> QueryContext: + """ + Creates or updates a reusable QueryContext object + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param query_formats: See QueryContext __init__ docstring + :param column_formats: See QueryContext __init__ docstring + :param encoding: See QueryContext __init__ docstring + :param use_none: Use None for ClickHouse NULL instead of default values. Note that using None in Numpy + arrays will force the numpy array dtype to 'object', which is often inefficient. This effect also + will impact the performance of Pandas dataframes. + :param column_oriented: Deprecated. Controls orientation of the QueryResult result_set property + :param use_numpy: Return QueryResult columns as one-dimensional numpy arrays + :param max_str_len: Limit returned ClickHouse String values to this length, which allows a Numpy + structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for + String columns will always be object arrays + :param context: An existing QueryContext to be updated with any provided parameter values + :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects). + Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime + objects with the selected timezone. + :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to + tzinfo objects). The timezone will be applied to datetime objects returned in the query + :param use_na_values: Deprecated alias for use_advanced_dtypes + :param as_pandas Return the result columns as pandas.Series objects + :param streaming Marker used to correctly configure streaming queries + :param external_data ClickHouse "external data" to send with query + :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as + pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray + and StringArray. Defaulted to True for query_df methods + :return: Reusable QueryContext + """ + if context: + return context.updated_copy(query=query, + parameters=parameters, + settings=settings, + query_formats=query_formats, + column_formats=column_formats, + encoding=encoding, + server_tz=self.server_tz, + use_none=use_none, + column_oriented=column_oriented, + use_numpy=use_numpy, + max_str_len=max_str_len, + query_tz=query_tz, + column_tzs=column_tzs, + as_pandas=as_pandas, + use_extended_dtypes=use_extended_dtypes, + streaming=streaming, + external_data=external_data) + if use_numpy and max_str_len is None: + max_str_len = 0 + if use_extended_dtypes is None: + use_extended_dtypes = use_na_values + if as_pandas and use_extended_dtypes is None: + use_extended_dtypes = True + return QueryContext(query=query, + parameters=parameters, + settings=settings, + query_formats=query_formats, + column_formats=column_formats, + encoding=encoding, + server_tz=self.server_tz, + use_none=use_none, + column_oriented=column_oriented, + use_numpy=use_numpy, + max_str_len=max_str_len, + query_tz=query_tz, + column_tzs=column_tzs, + use_extended_dtypes=use_extended_dtypes, + as_pandas=as_pandas, + streaming=streaming, + apply_server_tz=self.apply_server_timezone, + external_data=external_data) + + def query_arrow(self, + query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + use_strings: Optional[bool] = None, + external_data: Optional[ExternalData] = None): + """ + Query method using the ClickHouse Arrow format to return a PyArrow table + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) + :param external_data ClickHouse "external data" to send with query + :return: PyArrow.Table + """ + settings = dict_copy(settings) + if self.database: + settings['database'] = self.database + str_status = self._setting_status(arrow_str_setting) + if use_strings is None: + if str_status.is_writable and not str_status.is_set: + settings[arrow_str_setting] = '1' # Default to returning strings if possible + elif use_strings != str_status.is_set: + if not str_status.is_writable: + raise OperationalError(f'Cannot change readonly {arrow_str_setting} to {use_strings}') + settings[arrow_str_setting] = '1' if use_strings else '0' + return to_arrow(self.raw_query(query, + parameters, + settings, + fmt='Arrow', + external_data=external_data)) + + @abstractmethod + def command(self, + cmd: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + data: Union[str, bytes] = None, + settings: Dict[str, Any] = None, + use_database: bool = True, + external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + """ + Client method that returns a single value instead of a result set + :param cmd: ClickHouse query/command as a python format string + :param parameters: Optional dictionary of key/values pairs to be formatted + :param data: Optional 'data' for the command (for INSERT INTO in particular) + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client + database context. Otherwise, no database will be specified with the command. This is useful for determining + the default user database + :param external_data ClickHouse "external data" to send with command/query + :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary + if no data returned + """ + + @abstractmethod + def ping(self) -> bool: + """ + Validate the connection, does not throw an Exception (see debug logs) + :return: ClickHouse server is up and reachable + """ + + # pylint: disable=too-many-arguments + def insert(self, + table: Optional[str] = None, + data: Sequence[Sequence[Any]] = None, + column_names: Union[str, Iterable[str]] = '*', + database: Optional[str] = None, + column_types: Sequence[ClickHouseType] = None, + column_type_names: Sequence[str] = None, + column_oriented: bool = False, + settings: Optional[Dict[str, Any]] = None, + context: InsertContext = None) -> QuerySummary: + """ + Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments + other than data are ignored + :param table: Target table + :param data: Sequence of sequences of Python data + :param column_names: Ordered list of column names or '*' if column types should be retrieved from the + ClickHouse table definition + :param database: Target database -- will use client default database if not specified. + :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from + the server + :param column_type_names: ClickHouse column type names. If set then column data does not need to be + retrieved from the server + :param column_oriented: If true the data is already "pivoted" in column form + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param context: Optional reusable insert context to allow repeated inserts into the same table with + different data batches + :return: QuerySummary with summary information, throws exception if insert fails + """ + if (context is None or context.empty) and data is None: + raise ProgrammingError('No data specified for insert') from None + if context is None: + context = self.create_insert_context(table, + column_names, + database, + column_types, + column_type_names, + column_oriented, + settings) + if data is not None: + if not context.empty: + raise ProgrammingError('Attempting to insert new data with non-empty insert context') from None + context.data = data + return self.data_insert(context) + + def insert_df(self, table: str = None, + df=None, + database: Optional[str] = None, + settings: Optional[Dict] = None, + column_names: Optional[Sequence[str]] = None, + column_types: Sequence[ClickHouseType] = None, + column_type_names: Sequence[str] = None, + context: InsertContext = None) -> QuerySummary: + """ + Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored + :param table: ClickHouse table + :param df: two-dimensional pandas dataframe + :param database: Optional ClickHouse database + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param column_names: An optional list of ClickHouse column names. If not set, the DataFrame column names + will be used + :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from + the server + :param column_type_names: ClickHouse column type names. If set then column data does not need to be + retrieved from the server + :param context: Optional reusable insert context to allow repeated inserts into the same table with + different data batches + :return: QuerySummary with summary information, throws exception if insert fails + """ + if context is None: + if column_names is None: + column_names = df.columns + elif len(column_names) != len(df.columns): + raise ProgrammingError('DataFrame column count does not match insert_columns') from None + return self.insert(table, + df, + column_names, + database, + column_types=column_types, + column_type_names=column_type_names, + settings=settings, context=context) + + def insert_arrow(self, table: str, + arrow_table, database: str = None, + settings: Optional[Dict] = None) -> QuerySummary: + """ + Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format + :param table: ClickHouse table + :param arrow_table: PyArrow Table object + :param database: Optional ClickHouse database + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :return: QuerySummary with summary information, throws exception if insert fails + """ + full_table = table if '.' in table or not database else f'{database}.{table}' + column_names, insert_block = arrow_buffer(arrow_table) + return self.raw_insert(full_table, column_names, insert_block, settings, 'Arrow') + + def create_insert_context(self, + table: str, + column_names: Optional[Union[str, Sequence[str]]] = None, + database: Optional[str] = None, + column_types: Sequence[ClickHouseType] = None, + column_type_names: Sequence[str] = None, + column_oriented: bool = False, + settings: Optional[Dict[str, Any]] = None, + data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext: + """ + Builds a reusable insert context to hold state for a duration of an insert + :param table: Target table + :param database: Target database. If not set, uses the client default database + :param column_names: Optional ordered list of column names. If not set, all columns ('*') will be assumed + in the order specified by the table definition + :param database: Target database -- will use client default database if not specified + :param column_types: ClickHouse column types. Optional Sequence of ClickHouseType objects. If neither column + types nor column type names are set, actual column types will be retrieved from the server. + :param column_type_names: ClickHouse column type names. Specified column types by name string + :param column_oriented: If true the data is already "pivoted" in column form + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param data: Initial dataset for insert + :return Reusable insert context + """ + full_table = table if '.' in table or not database else f'{database}.{table}' + column_defs = [] + if column_types is None and column_type_names is None: + describe_result = self.query(f'DESCRIBE TABLE {full_table}') + column_defs = [ColumnDef(**row) for row in describe_result.named_results() + if row['default_type'] not in ('ALIAS', 'MATERIALIZED')] + if column_names is None or isinstance(column_names, str) and column_names == '*': + column_names = [cd.name for cd in column_defs] + column_types = [cd.ch_type for cd in column_defs] + elif isinstance(column_names, str): + column_names = [column_names] + if len(column_names) == 0: + raise ValueError('Column names must be specified for insert') + if not column_types: + if column_type_names: + column_types = [get_from_name(name) for name in column_type_names] + else: + column_map = {d.name: d for d in column_defs} + try: + column_types = [column_map[name].ch_type for name in column_names] + except KeyError as ex: + raise ProgrammingError(f'Unrecognized column {ex} in table {table}') from None + if len(column_names) != len(column_types): + raise ProgrammingError('Column names do not match column types') from None + return InsertContext(full_table, + column_names, + column_types, + column_oriented=column_oriented, + settings=settings, + data=data) + + def min_version(self, version_str: str) -> bool: + """ + Determine whether the connected server is at least the submitted version + For Altinity Stable versions like 22.8.15.25.altinitystable + the last condition in the first list comprehension expression is added + :param version_str: A version string consisting of up to 4 integers delimited by dots + :return: True if version_str is greater than the server_version, False if less than + """ + try: + server_parts = [int(x) for x in self.server_version.split('.') if x.isnumeric()] + server_parts.extend([0] * (4 - len(server_parts))) + version_parts = [int(x) for x in version_str.split('.')] + version_parts.extend([0] * (4 - len(version_parts))) + except ValueError: + logger.warning('Server %s or requested version %s does not match format of numbers separated by dots', + self.server_version, version_str) + return False + for x, y in zip(server_parts, version_parts): + if x > y: + return True + if x < y: + return False + return True + + @abstractmethod + def data_insert(self, context: InsertContext) -> QuerySummary: + """ + Subclass implementation of the data insert + :context: InsertContext parameter object + :return: No return, throws an exception if the insert fails + """ + + @abstractmethod + def raw_insert(self, table: str, + column_names: Optional[Sequence[str]] = None, + insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, + settings: Optional[Dict] = None, + fmt: Optional[str] = None, + compression: Optional[str] = None) -> QuerySummary: + """ + Insert data already formatted in a bytes object + :param table: Table name (whether qualified with the database name or not) + :param column_names: Sequence of column names + :param insert_block: Binary or string data already in a recognized ClickHouse format + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param compression: Recognized ClickHouse `Accept-Encoding` header compression value + :param fmt: Valid clickhouse format + """ + + def close(self): + """ + Subclass implementation to close the connection to the server/deallocate the client + """ + + def _context_query(self, lcls: dict, **overrides): + kwargs = lcls.copy() + kwargs.pop('self') + kwargs.update(overrides) + return self._query_with_context((self.create_query_context(**kwargs))) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + self.close() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py new file mode 100644 index 0000000000..71adb00321 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py @@ -0,0 +1,206 @@ +import array +import struct +import sys + +from typing import Sequence, MutableSequence, Dict, Optional, Union, Generator + +from clickhouse_connect.driver.exceptions import ProgrammingError, StreamClosedError, DataError +from clickhouse_connect.driver.types import Closable + +# pylint: disable=invalid-name +must_swap = sys.byteorder == 'big' +int_size = array.array('i').itemsize +low_card_version = 1 + +array_map = {1: 'b', 2: 'h', 4: 'i', 8: 'q'} +decimal_prec = {32: 9, 64: 18, 128: 38, 256: 79} + +if int_size == 2: + array_map[4] = 'l' + +array_sizes = {v: k for k, v in array_map.items()} +array_sizes['f'] = 4 +array_sizes['d'] = 8 +np_date_types = {0: '[s]', 3: '[ms]', 6: '[us]', 9: '[ns]'} + + +def array_type(size: int, signed: bool): + """ + Determines the Python array.array code for the requested byte size + :param size: byte size + :param signed: whether int types should be signed or unsigned + :return: Python array.array code + """ + try: + code = array_map[size] + except KeyError: + return None + return code if signed else code.upper() + + +def write_array(code: str, column: Sequence, dest: MutableSequence): + """ + Write a column of native Python data matching the array.array code + :param code: Python array.array code matching the column data type + :param column: Column of native Python values + :param dest: Destination byte buffer + """ + if len(column) and not isinstance(column[0], (int, float)): + if code in ('f', 'F', 'd', 'D'): + column = [float(x) for x in column] + else: + column = [int(x) for x in column] + try: + buff = struct.Struct(f'<{len(column)}{code}') + dest += buff.pack(*column) + except (TypeError, OverflowError, struct.error) as ex: + raise DataError('Unable to create Python array. This is usually caused by trying to insert None ' + + 'values into a ClickHouse column that is not Nullable') from ex + + +def write_uint64(value: int, dest: MutableSequence): + """ + Write a single UInt64 value to a binary write buffer + :param value: UInt64 value to write + :param dest: Destination byte buffer + """ + dest.extend(value.to_bytes(8, 'little')) + + +def write_leb128(value: int, dest: MutableSequence): + """ + Write a LEB128 encoded integer to a target binary buffer + :param value: Integer value (positive only) + :param dest: Target buffer + """ + while True: + b = value & 0x7f + value >>= 7 + if value == 0: + dest.append(b) + return + dest.append(0x80 | b) + + +def decimal_size(prec: int): + """ + Determine the bit size of a ClickHouse or Python Decimal needed to store a value of the requested precision + :param prec: Precision of the Decimal in total number of base 10 digits + :return: Required bit size + """ + if prec < 1 or prec > 79: + raise ArithmeticError(f'Invalid precision {prec} for ClickHouse Decimal type') + if prec < 10: + return 32 + if prec < 19: + return 64 + if prec < 39: + return 128 + return 256 + + +def unescape_identifier(x: str) -> str: + if x.startswith('`') and x.endswith('`'): + return x[1:-1] + return x + + +def dict_copy(source: Dict = None, update: Optional[Dict] = None) -> Dict: + copy = source.copy() if source else {} + if update: + copy.update(update) + return copy + + +def empty_gen(): + yield from () + + +def coerce_int(val: Optional[Union[str, int]]) -> int: + if not val: + return 0 + return int(val) + + +def coerce_bool(val: Optional[Union[str, bool]]): + if not val: + return False + return val in (True, 'True', 'true', '1') + + +class SliceView(Sequence): + """ + Provides a view into a sequence rather than copying. Borrows liberally from + https://gist.github.com/mathieucaroff/0cf094325fb5294fb54c6a577f05a2c1 + Also see the discussion on SO: https://stackoverflow.com/questions/3485475/can-i-create-a-view-on-a-python-list + """ + slots = ('_source', '_range') + + def __init__(self, source: Sequence, source_slice: Optional[slice] = None): + if isinstance(source, SliceView): + self._source = source._source + self._range = source._range[source_slice] + else: + self._source = source + if source_slice is None: + self._range = range(len(source)) + else: + self._range = range(len(source))[source_slice] + + def __len__(self): + return len(self._range) + + def __getitem__(self, i): + if isinstance(i, slice): + return SliceView(self._source, i) + return self._source[self._range[i]] + + def __str__(self): + r = self._range + return str(self._source[slice(r.start, r.stop, r.step)]) + + def __repr__(self): + r = self._range + return f'SliceView({self._source[slice(r.start, r.stop, r.step)]})' + + def __eq__(self, other): + if self is other: + return True + if len(self) != len(other): + return False + for v, w in zip(self, other): + if v != w: + return False + return True + + +class StreamContext: + """ + Wraps a generator and its "source" in a Context. This ensures that the source will be "closed" even if the + generator is not fully consumed or there is an exception during consumption + """ + __slots__ = 'source', 'gen', '_in_context' + + def __init__(self, source: Closable, gen: Generator): + self.source = source + self.gen = gen + self._in_context = False + + def __iter__(self): + return self + + def __next__(self): + if not self._in_context: + raise ProgrammingError('Stream should be used within a context') + return next(self.gen) + + def __enter__(self): + if not self.gen: + raise StreamClosedError + self._in_context = True + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._in_context = False + self.source.close() + self.gen = None diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/compression.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/compression.py new file mode 100644 index 0000000000..db69ae3f04 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/compression.py @@ -0,0 +1,77 @@ +import zlib +from abc import abstractmethod +from typing import Union + +import lz4 +import lz4.frame +import zstandard + +try: + import brotli +except ImportError: + brotli = None + + +available_compression = ['lz4', 'zstd'] + +if brotli: + available_compression.append('br') +available_compression.extend(['gzip', 'deflate']) + +comp_map = {} + + +class Compressor: + def __init_subclass__(cls, tag: str, thread_safe: bool = True): + comp_map[tag] = cls() if thread_safe else cls + + @abstractmethod + def compress_block(self, block) -> Union[bytes, bytearray]: + return block + + def flush(self): + pass + + +class GzipCompressor(Compressor, tag='gzip', thread_safe=False): + def __init__(self, level: int = 6, wbits: int = 31): + self.zlib_obj = zlib.compressobj(level=level, wbits=wbits) + + def compress_block(self, block): + return self.zlib_obj.compress(block) + + def flush(self): + return self.zlib_obj.flush() + + +class Lz4Compressor(Compressor, tag='lz4', thread_safe=False): + def __init__(self): + self.comp = lz4.frame.LZ4FrameCompressor() + + def compress_block(self, block): + output = self.comp.begin(len(block)) + output += self.comp.compress(block) + return output + self.comp.flush() + + +class ZstdCompressor(Compressor, tag='zstd'): + def compress_block(self, block): + return zstandard.compress(block) + + +class BrotliCompressor(Compressor, tag='br'): + def compress_block(self, block): + return brotli.compress(block) + + +null_compressor = Compressor() + + +def get_compressor(compression: str) -> Compressor: + if not compression: + return null_compressor + comp = comp_map[compression] + try: + return comp() + except TypeError: + return comp diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/constants.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/constants.py new file mode 100644 index 0000000000..a242e559b9 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/constants.py @@ -0,0 +1,2 @@ +PROTOCOL_VERSION_WITH_LOW_CARD = 54405 +CH_VERSION_WITH_PROTOCOL = '23.2.1.2537' diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py new file mode 100644 index 0000000000..7984fbeebb --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/context.py @@ -0,0 +1,72 @@ +import logging +import re +from datetime import datetime +from typing import Optional, Dict, Union, Any + +import pytz + +logger = logging.getLogger(__name__) + +_empty_map = {} + + +# pylint: disable=too-many-instance-attributes +class BaseQueryContext: + local_tz: pytz.timezone + + def __init__(self, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_extended_dtypes: bool = False, + use_numpy: bool = False): + self.settings = settings or {} + if query_formats is None: + self.type_formats = _empty_map + else: + self.type_formats = {re.compile(type_name.replace('*', '.*'), re.IGNORECASE): fmt + for type_name, fmt in query_formats.items()} + if column_formats is None: + self.col_simple_formats = _empty_map + self.col_type_formats = _empty_map + else: + self.col_simple_formats = {col_name: fmt for col_name, fmt in column_formats.items() if + isinstance(fmt, str)} + self.col_type_formats = {} + for col_name, fmt in column_formats.items(): + if not isinstance(fmt, str): + self.col_type_formats[col_name] = {re.compile(type_name.replace('*', '.*'), re.IGNORECASE): fmt + for type_name, fmt in fmt.items()} + self.query_formats = query_formats or {} + self.column_formats = column_formats or {} + self.encoding = encoding + self.use_numpy = use_numpy + self.use_extended_dtypes = use_extended_dtypes + self._active_col_fmt = None + self._active_col_type_fmts = _empty_map + + def start_column(self, name: str): + self._active_col_fmt = self.col_simple_formats.get(name) + self._active_col_type_fmts = self.col_type_formats.get(name, _empty_map) + + def active_fmt(self, ch_type): + if self._active_col_fmt: + return self._active_col_fmt + for type_pattern, fmt in self._active_col_type_fmts.items(): + if type_pattern.match(ch_type): + return fmt + for type_pattern, fmt in self.type_formats.items(): + if type_pattern.match(ch_type): + return fmt + return None + + +def _init_context_cls(): + local_tz = datetime.now().astimezone().tzinfo + if local_tz.tzname(datetime.now()) in ('UTC', 'GMT', 'Universal', 'GMT-0', 'Zulu', 'Greenwich'): + local_tz = pytz.UTC + BaseQueryContext.local_tz = local_tz + + +_init_context_cls() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py new file mode 100644 index 0000000000..e7bb607e68 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ctypes.py @@ -0,0 +1,49 @@ +import logging +import os + +import clickhouse_connect.driver.dataconv as pydc +import clickhouse_connect.driver.npconv as pync +from clickhouse_connect.driver.buffer import ResponseBuffer +from clickhouse_connect.driver.common import coerce_bool + +logger = logging.getLogger(__name__) + +RespBuffCls = ResponseBuffer +data_conv = pydc +numpy_conv = pync + + +# pylint: disable=import-outside-toplevel,global-statement + +def connect_c_modules(): + if not coerce_bool(os.environ.get('CLICKHOUSE_CONNECT_USE_C', True)): + logger.info('ClickHouse Connect C optimizations disabled') + return + + global RespBuffCls, data_conv + try: + from clickhouse_connect.driverc.buffer import ResponseBuffer as CResponseBuffer + import clickhouse_connect.driverc.dataconv as cdc + + data_conv = cdc + RespBuffCls = CResponseBuffer + logger.debug('Successfully imported ClickHouse Connect C data optimizations') + connect_numpy() + except ImportError as ex: + logger.warning('Unable to connect optimized C data functions [%s], falling back to pure Python', + str(ex)) + + +def connect_numpy(): + global numpy_conv + try: + import clickhouse_connect.driverc.npconv as cnc + + numpy_conv = cnc + logger.debug('Successfully import ClickHouse Connect C/Numpy optimizations') + except ImportError as ex: + logger.debug('Unable to connect ClickHouse Connect C to Numpy API [%s], falling back to pure Python', + str(ex)) + + +connect_c_modules() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/dataconv.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/dataconv.py new file mode 100644 index 0000000000..29c96a9a66 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/dataconv.py @@ -0,0 +1,129 @@ +import array +from datetime import datetime, date, tzinfo +from ipaddress import IPv4Address +from typing import Sequence, Optional, Any +from uuid import UUID, SafeUUID + +from clickhouse_connect.driver.common import int_size +from clickhouse_connect.driver.types import ByteSource +from clickhouse_connect.driver.options import np + + +MONTH_DAYS = (0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334, 365) +MONTH_DAYS_LEAP = (0, 31, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335, 366) + + +def read_ipv4_col(source: ByteSource, num_rows: int): + column = source.read_array('I', num_rows) + fast_ip_v4 = IPv4Address.__new__ + new_col = [] + app = new_col.append + for x in column: + ipv4 = fast_ip_v4(IPv4Address) + ipv4._ip = x # pylint: disable=protected-access + app(ipv4) + return new_col + + +def read_datetime_col(source: ByteSource, num_rows: int, tz_info: Optional[tzinfo]): + src_array = source.read_array('I', num_rows) + if tz_info is None: + fts = datetime.utcfromtimestamp + return [fts(ts) for ts in src_array] + fts = datetime.fromtimestamp + return [fts(ts, tz_info) for ts in src_array] + + +def epoch_days_to_date(days: int) -> date: + cycles400, rem = divmod(days + 134774, 146097) + cycles100, rem = divmod(rem, 36524) + cycles, rem = divmod(rem, 1461) + years, rem = divmod(rem, 365) + year = (cycles << 2) + cycles400 * 400 + cycles100 * 100 + years + 1601 + if years == 4 or cycles100 == 4: + return date(year - 1, 12, 31) + m_list = MONTH_DAYS_LEAP if years == 3 and (year == 2000 or year % 100 != 0) else MONTH_DAYS + month = (rem + 24) >> 5 + while rem < m_list[month]: + month -= 1 + return date(year, month + 1, rem + 1 - m_list[month]) + + +def read_date_col(source: ByteSource, num_rows: int): + column = source.read_array('H', num_rows) + return [epoch_days_to_date(x) for x in column] + + +def read_date32_col(source: ByteSource, num_rows: int): + column = source.read_array('l' if int_size == 2 else 'i', num_rows) + return [epoch_days_to_date(x) for x in column] + + +def read_uuid_col(source: ByteSource, num_rows: int): + v = source.read_array('Q', num_rows * 2) + empty_uuid = UUID(int=0) + new_uuid = UUID.__new__ + unsafe = SafeUUID.unsafe + oset = object.__setattr__ + column = [] + app = column.append + for i in range(num_rows): + ix = i << 1 + int_value = v[ix] << 64 | v[ix + 1] + if int_value == 0: + app(empty_uuid) + else: + fast_uuid = new_uuid(UUID) + oset(fast_uuid, 'int', int_value) + oset(fast_uuid, 'is_safe', unsafe) + app(fast_uuid) + return column + + +def read_nullable_array(source: ByteSource, array_type: str, num_rows: int, null_obj: Any): + null_map = source.read_bytes(num_rows) + column = source.read_array(array_type, num_rows) + return [null_obj if null_map[ix] else column[ix] for ix in range(num_rows)] + + +def build_nullable_column(source: Sequence, null_map: bytes, null_obj: Any): + return [source[ix] if null_map[ix] == 0 else null_obj for ix in range(len(source))] + + +def build_lc_nullable_column(index: Sequence, keys: array.array, null_obj: Any): + column = [] + for key in keys: + if key == 0: + column.append(null_obj) + else: + column.append(index[key]) + return column + + +def to_numpy_array(column: Sequence): + arr = np.empty((len(column),), dtype=np.object) + arr[:] = column + return arr + + +def pivot(data: Sequence[Sequence], start_row: int, end_row: int) -> Sequence[Sequence]: + return tuple(zip(*data[start_row: end_row])) + + +def write_str_col(column: Sequence, encoding: Optional[str], dest: bytearray): + app = dest.append + for x in column: + if not x: + app(0) + else: + if encoding: + x = x.encode(encoding) + sz = len(x) + while True: + b = sz & 0x7f + sz >>= 7 + if sz == 0: + app(b) + break + app(0x80 | b) + dest += x diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/ddl.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ddl.py new file mode 100644 index 0000000000..a9a1a4b0aa --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/ddl.py @@ -0,0 +1,28 @@ +from typing import NamedTuple, Sequence + +from clickhouse_connect.datatypes.base import ClickHouseType + + +class TableColumnDef(NamedTuple): + """ + Simplified ClickHouse Table Column definition for DDL + """ + name: str + ch_type: ClickHouseType + expr_type: str = None + expr: str = None + + @property + def col_expr(self): + expr = f'{self.name} {self.ch_type.name}' + if self.expr_type: + expr += f' {self.expr_type} {self.expr}' + return expr + + +def create_table(table_name: str, columns: Sequence[TableColumnDef], engine: str, engine_params: dict): + stmt = f"CREATE TABLE {table_name} ({', '.join(col.col_expr for col in columns)}) ENGINE {engine} " + if engine_params: + for key, value in engine_params.items(): + stmt += f' {key} {value}' + return stmt diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/exceptions.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/exceptions.py new file mode 100644 index 0000000000..1cd41f9933 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/exceptions.py @@ -0,0 +1,84 @@ +""" +The driver exception classes here include all named exceptions required by th DB API 2.0 specification. It's not clear +how useful that naming convention is, but the convention is used for potential improved compatibility with other +libraries. In most cases docstring are taken from the DBIApi 2.0 documentation +""" + + +class ClickHouseError(Exception): + """Exception related to operation with ClickHouse.""" + + +# pylint: disable=redefined-builtin +class Warning(Warning, ClickHouseError): + """Exception raised for important warnings like data truncations + while inserting, etc.""" + + +class Error(ClickHouseError): + """Exception that is the base class of all other error exceptions + (not Warning).""" + + +class InterfaceError(Error): + """Exception raised for errors that are related to the database + interface rather than the database itself.""" + + +class DatabaseError(Error): + """Exception raised for errors that are related to the + database.""" + + +class DataError(DatabaseError): + """Exception raised for errors that are due to problems with the + processed data like division by zero, numeric value out of range, + etc.""" + + +class OperationalError(DatabaseError): + """Exception raised for errors that are related to the database's + operation and not necessarily under the control of the programmer, + e.g. an unexpected disconnect occurs, the data source name is not + found, a transaction could not be processed, a memory allocation + error occurred during processing, etc.""" + + +class IntegrityError(DatabaseError): + """Exception raised when the relational integrity of the database + is affected, e.g. a foreign key check fails, duplicate key, + etc.""" + + +class InternalError(DatabaseError): + """Exception raised when the database encounters an internal + error, e.g. the cursor is not valid anymore, the transaction is + out of sync, etc.""" + + +class ProgrammingError(DatabaseError): + """Exception raised for programming errors, e.g. table not found + or already exists, syntax error in the SQL statement, wrong number + of parameters specified, etc.""" + + +class NotSupportedError(DatabaseError): + """Exception raised in case a method or database API was used + which is not supported by the database, e.g. requesting a + .rollback() on a connection that does not support transaction or + has transactions turned off.""" + + +class StreamClosedError(ProgrammingError): + """Exception raised when a stream operation is executed on a closed stream.""" + + def __init__(self): + super().__init__('Executing a streaming operation on a closed stream') + + +class StreamCompleteException(Exception): + """ Internal exception used to indicate the end of a ClickHouse query result stream.""" + + +class StreamFailureError(Exception): + """ Stream failed unexpectedly """ diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/external.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/external.py new file mode 100644 index 0000000000..2d34f71ba8 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/external.py @@ -0,0 +1,127 @@ +import logging +from typing import Optional, Sequence, Dict, Union +from pathlib import Path + +from clickhouse_connect.driver.exceptions import ProgrammingError + +logger = logging.getLogger(__name__) + + +class ExternalFile: + # pylint: disable=too-many-branches + def __init__(self, + file_path: Optional[str] = None, + file_name: Optional[str] = None, + data: Optional[bytes] = None, + fmt: Optional[str] = None, + types: Optional[Union[str, Sequence[str]]] = None, + structure: Optional[Union[str, Sequence[str]]] = None, + mime_type: Optional[str] = None): + if file_path: + if data: + raise ProgrammingError('Only data or file_path should be specified for external data, not both') + try: + with open(file_path, 'rb') as file: + self.data = file.read() + except OSError as ex: + raise ProgrammingError(f'Failed to open file {file_path} for external data') from ex + path_name = Path(file_path).name + path_base = path_name.rsplit('.', maxsplit=1)[0] + if not file_name: + self.name = path_base + self.file_name = path_name + else: + self.name = file_name.rsplit('.', maxsplit=1)[0] + self.file_name = file_name + if file_name != path_name and path_base != self.name: + logger.warning('External data name %s and file_path %s use different names', file_name, path_name) + elif data: + if not file_name: + raise ProgrammingError('Name is required for query external data') + self.data = data + self.name = file_name.rsplit('.', maxsplit=1)[0] + self.file_name = file_name + else: + raise ProgrammingError('Either data or file_path must be specified for external data') + if types: + if structure: + raise ProgrammingError('Only types or structure should be specified for external data, not both') + self.structure = None + if isinstance(types, str): + self.types = types + else: + self.types = ','.join(types) + elif structure: + self.types = None + if isinstance(structure, str): + self.structure = structure + else: + self.structure = ','.join(structure) + self.fmt = fmt + self.mime_type = mime_type or 'application/octet-stream' + + @property + def form_data(self) -> tuple: + return self.file_name, self.data, self.mime_type + + @property + def query_params(self) -> Dict[str, str]: + params = {} + for name, value in (('format', self.fmt), + ('structure', self.structure), + ('types', self.types)): + if value: + params[f'{self.name}_{name}'] = value + return params + + +class ExternalData: + def __init__(self, + file_path: Optional[str] = None, + file_name: Optional[str] = None, + data: Optional[bytes] = None, + fmt: Optional[str] = None, + types: Optional[Union[str, Sequence[str]]] = None, + structure: Optional[Union[str, Sequence[str]]] = None, + mime_type: Optional[str] = None): + self.files: list[ExternalFile] = [] + if file_path or data: + first_file = ExternalFile(file_path=file_path, + file_name=file_name, + data=data, + fmt=fmt, + types=types, + structure=structure, + mime_type=mime_type) + self.files.append(first_file) + + def add_file(self, + file_path: Optional[str] = None, + file_name: Optional[str] = None, + data: Optional[bytes] = None, + fmt: Optional[str] = None, + types: Optional[Union[str, Sequence[str]]] = None, + structure: Optional[Union[str, Sequence[str]]] = None, + mime_type: Optional[str] = None): + self.files.append(ExternalFile(file_path=file_path, + file_name=file_name, + data=data, + fmt=fmt, + types=types, + structure=structure, + mime_type=mime_type)) + + @property + def form_data(self) -> Dict[str, tuple]: + if not self.files: + raise ProgrammingError('No external files set for external data') + return {file.name: file.form_data for file in self.files} + + @property + def query_params(self) -> Dict[str, str]: + if not self.files: + raise ProgrammingError('No external files set for external data') + params = {} + for file in self.files: + params.update(file.query_params) + return params diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py new file mode 100644 index 0000000000..c8fa9e6116 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py @@ -0,0 +1,473 @@ +import json +import logging +import re +import uuid +from base64 import b64encode +from typing import Optional, Dict, Any, Sequence, Union, List, Callable, Generator, BinaryIO +from urllib.parse import urlencode + +from urllib3 import Timeout +from urllib3.exceptions import HTTPError +from urllib3.poolmanager import PoolManager +from urllib3.response import HTTPResponse + +from clickhouse_connect import common +from clickhouse_connect.datatypes import registry +from clickhouse_connect.datatypes.base import ClickHouseType +from clickhouse_connect.driver.ctypes import RespBuffCls +from clickhouse_connect.driver.client import Client +from clickhouse_connect.driver.common import dict_copy, coerce_bool, coerce_int +from clickhouse_connect.driver.compression import available_compression +from clickhouse_connect.driver.exceptions import DatabaseError, OperationalError, ProgrammingError +from clickhouse_connect.driver.external import ExternalData +from clickhouse_connect.driver.httputil import ResponseSource, get_pool_manager, get_response_data, \ + default_pool_manager, get_proxy_manager, all_managers, check_env_proxy, check_conn_reset +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.summary import QuerySummary +from clickhouse_connect.driver.query import QueryResult, QueryContext, quote_identifier, bind_query +from clickhouse_connect.driver.transform import NativeTransform + +logger = logging.getLogger(__name__) +columns_only_re = re.compile(r'LIMIT 0\s*$', re.IGNORECASE) + + +# pylint: disable=too-many-instance-attributes +class HttpClient(Client): + params = {} + valid_transport_settings = {'database', 'buffer_size', 'session_id', + 'compress', 'decompress', 'session_timeout', + 'session_check', 'query_id', 'quota_key', + 'wait_end_of_query', 'client_protocol_version'} + optional_transport_settings = {'send_progress_in_http_headers', + 'http_headers_progress_interval_ms', + 'enable_http_compression'} + _owns_pool_manager = False + + # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements,unused-argument + def __init__(self, + interface: str, + host: str, + port: int, + username: str, + password: str, + database: str, + compress: Union[bool, str] = True, + query_limit: int = 0, + query_retries: int = 2, + connect_timeout: int = 10, + send_receive_timeout: int = 300, + client_name: Optional[str] = None, + verify: bool = True, + ca_cert: Optional[str] = None, + client_cert: Optional[str] = None, + client_cert_key: Optional[str] = None, + session_id: Optional[str] = None, + settings: Optional[Dict[str, Any]] = None, + pool_mgr: Optional[PoolManager] = None, + http_proxy: Optional[str] = None, + https_proxy: Optional[str] = None, + server_host_name: Optional[str] = None, + apply_server_timezone: Optional[Union[str, bool]] = True): + """ + Create an HTTP ClickHouse Connect client + See clickhouse_connect.get_client for parameters + """ + self.url = f'{interface}://{host}:{port}' + self.headers = {} + ch_settings = settings or {} + self.http = pool_mgr + if interface == 'https': + if not https_proxy: + https_proxy = check_env_proxy('https', host, port) + if client_cert: + if not username: + raise ProgrammingError('username parameter is required for Mutual TLS authentication') + self.headers['X-ClickHouse-User'] = username + self.headers['X-ClickHouse-SSL-Certificate-Auth'] = 'on' + verify = coerce_bool(verify) + # pylint: disable=too-many-boolean-expressions + if not self.http and (server_host_name or ca_cert or client_cert or not verify or https_proxy): + options = { + 'ca_cert': ca_cert, + 'client_cert': client_cert, + 'verify': verify, + 'client_cert_key': client_cert_key + } + if server_host_name: + if verify: + options['assert_hostname'] = server_host_name + options['server_hostname'] = server_host_name + self.http = get_pool_manager(https_proxy=https_proxy, **options) + self._owns_pool_manager = True + if not self.http: + if not http_proxy: + http_proxy = check_env_proxy('http', host, port) + if http_proxy: + self.http = get_proxy_manager(host, http_proxy) + else: + self.http = default_pool_manager() + + if not client_cert and username: + self.headers['Authorization'] = 'Basic ' + b64encode(f'{username}:{password}'.encode()).decode() + self.headers['User-Agent'] = common.build_client_name(client_name) + self._read_format = self._write_format = 'Native' + self._transform = NativeTransform() + + connect_timeout, send_receive_timeout = coerce_int(connect_timeout), coerce_int(send_receive_timeout) + self.timeout = Timeout(connect=connect_timeout, read=send_receive_timeout) + self.http_retries = 1 + self._send_progress = None + self._send_comp_setting = False + self._progress_interval = None + self._active_session = None + + if session_id: + ch_settings['session_id'] = session_id + elif 'session_id' not in ch_settings and common.get_setting('autogenerate_session_id'): + ch_settings['session_id'] = str(uuid.uuid4()) + + if coerce_bool(compress): + compression = ','.join(available_compression) + self.write_compression = available_compression[0] + elif compress and compress not in ('False', 'false', '0'): + if compress not in available_compression: + raise ProgrammingError(f'Unsupported compression method {compress}') + compression = compress + self.write_compression = compress + else: + compression = None + + super().__init__(database=database, + uri=self.url, + query_limit=query_limit, + query_retries=query_retries, + server_host_name=server_host_name, + apply_server_timezone=apply_server_timezone) + self.params = self._validate_settings(ch_settings) + comp_setting = self._setting_status('enable_http_compression') + self._send_comp_setting = not comp_setting.is_set and comp_setting.is_writable + if comp_setting.is_set or comp_setting.is_writable: + self.compression = compression + send_setting = self._setting_status('send_progress_in_http_headers') + self._send_progress = not send_setting.is_set and send_setting.is_writable + if (send_setting.is_set or send_setting.is_writable) and \ + self._setting_status('http_headers_progress_interval_ms').is_writable: + self._progress_interval = str(min(120000, max(10000, (send_receive_timeout - 5) * 1000))) + + def set_client_setting(self, key, value): + str_value = self._validate_setting(key, value, common.get_setting('invalid_setting_action')) + if str_value is not None: + self.params[key] = str_value + + def get_client_setting(self, key) -> Optional[str]: + values = self.params.get(key) + return values[0] if values else None + + def _prep_query(self, context: QueryContext): + final_query = super()._prep_query(context) + if context.is_insert: + return final_query + return f'{final_query}\n FORMAT {self._write_format}' + + def _query_with_context(self, context: QueryContext) -> QueryResult: + headers = {} + params = {} + if self.database: + params['database'] = self.database + if self.protocol_version: + params['client_protocol_version'] = self.protocol_version + context.block_info = True + params.update(context.bind_params) + params.update(self._validate_settings(context.settings)) + if columns_only_re.search(context.uncommented_query): + response = self._raw_request(f'{context.final_query}\n FORMAT JSON', + params, headers, retries=self.query_retries) + json_result = json.loads(response.data) + # ClickHouse will respond with a JSON object of meta, data, and some other objects + # We just grab the column names and column types from the metadata sub object + names: List[str] = [] + types: List[ClickHouseType] = [] + for col in json_result['meta']: + names.append(col['name']) + types.append(registry.get_from_name(col['type'])) + return QueryResult([], None, tuple(names), tuple(types)) + + if self.compression: + headers['Accept-Encoding'] = self.compression + if self._send_comp_setting: + params['enable_http_compression'] = '1' + final_query = self._prep_query(context) + if context.external_data: + body = bytes() + params['query'] = final_query + params.update(context.external_data.query_params) + fields = context.external_data.form_data + else: + body = final_query + fields = None + headers['Content-Type'] = 'text/plain; charset=utf-8' + response = self._raw_request(body, + params, + headers, + stream=True, + retries=self.query_retries, + fields=fields, + server_wait=not context.streaming) + byte_source = RespBuffCls(ResponseSource(response)) # pylint: disable=not-callable + context.set_response_tz(self._check_tz_change(response.headers.get('X-ClickHouse-Timezone'))) + query_result = self._transform.parse_response(byte_source, context) + query_result.summary = self._summary(response) + return query_result + + def data_insert(self, context: InsertContext) -> QuerySummary: + """ + See BaseClient doc_string for this method + """ + if context.empty: + logger.debug('No data included in insert, skipping') + return QuerySummary() + + def error_handler(resp: HTTPResponse): + # If we actually had a local exception when building the insert, throw that instead + if context.insert_exception: + ex = context.insert_exception + context.insert_exception = None + raise ex + self._error_handler(resp) + + headers = {'Content-Type': 'application/octet-stream'} + if context.compression is None: + context.compression = self.write_compression + if context.compression: + headers['Content-Encoding'] = context.compression + block_gen = self._transform.build_insert(context) + + params = {} + if self.database: + params['database'] = self.database + params.update(self._validate_settings(context.settings)) + + response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False) + logger.debug('Context insert response code: %d, content: %s', response.status, response.data) + context.data = None + return QuerySummary(self._summary(response)) + + def raw_insert(self, table: str = None, + column_names: Optional[Sequence[str]] = None, + insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, + settings: Optional[Dict] = None, + fmt: Optional[str] = None, + compression: Optional[str] = None) -> QuerySummary: + """ + See BaseClient doc_string for this method + """ + params = {} + headers = {'Content-Type': 'application/octet-stream'} + if compression: + headers['Content-Encoding'] = compression + if table: + cols = f" ({', '.join([quote_identifier(x) for x in column_names])})" if column_names is not None else '' + query = f'INSERT INTO {table}{cols} FORMAT {fmt if fmt else self._write_format}' + if not compression and isinstance(insert_block, str): + insert_block = query + '\n' + insert_block + elif not compression and isinstance(insert_block, (bytes, bytearray, BinaryIO)): + insert_block = (query + '\n').encode() + insert_block + else: + params['query'] = query + if self.database: + params['database'] = self.database + params.update(self._validate_settings(settings or {})) + response = self._raw_request(insert_block, params, headers, server_wait=False) + logger.debug('Raw insert response code: %d, content: %s', response.status, response.data) + return QuerySummary(self._summary(response)) + + @staticmethod + def _summary(response: HTTPResponse): + summary = {} + if 'X-ClickHouse-Summary' in response.headers: + try: + summary = json.loads(response.headers['X-ClickHouse-Summary']) + except json.JSONDecodeError: + pass + summary['query_id'] = response.headers.get('X-ClickHouse-Query-Id', '') + return summary + + def command(self, + cmd, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + data: Union[str, bytes] = None, + settings: Optional[Dict] = None, + use_database: int = True, + external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + """ + See BaseClient doc_string for this method + """ + cmd, params = bind_query(cmd, parameters, self.server_tz) + headers = {} + payload = None + fields = None + if external_data: + if data: + raise ProgrammingError('Cannot combine command data with external data') from None + fields = external_data.form_data + params.update(external_data.query_params) + elif isinstance(data, str): + headers['Content-Type'] = 'text/plain; charset=utf-8' + payload = data.encode() + elif isinstance(data, bytes): + headers['Content-Type'] = 'application/octet-stream' + payload = data + if payload is None and not cmd: + raise ProgrammingError('Command sent without query or recognized data') from None + if payload or fields: + params['query'] = cmd + else: + payload = cmd + if use_database and self.database: + params['database'] = self.database + params.update(self._validate_settings(settings or {})) + + method = 'POST' if payload or fields else 'GET' + response = self._raw_request(payload, params, headers, method, fields=fields) + if response.data: + try: + result = response.data.decode()[:-1].split('\t') + if len(result) == 1: + try: + return int(result[0]) + except ValueError: + return result[0] + return result + except UnicodeDecodeError: + return str(response.data) + return QuerySummary(self._summary(response)) + + def _error_handler(self, response: HTTPResponse, retried: bool = False) -> None: + err_str = f'HTTPDriver for {self.url} returned response code {response.status})' + try: + err_content = get_response_data(response) + except Exception: # pylint: disable=broad-except + err_content = None + finally: + response.close() + + if err_content: + err_msg = common.format_error(err_content.decode(errors='backslashreplace')) + err_str = f':{err_str}\n {err_msg}' + raise OperationalError(err_str) if retried else DatabaseError(err_str) from None + + def _raw_request(self, + data, + params: Dict[str, str], + headers: Optional[Dict[str, Any]] = None, + method: str = 'POST', + retries: int = 0, + stream: bool = False, + server_wait: bool = True, + fields: Optional[Dict[str, tuple]] = None, + error_handler: Callable = None) -> HTTPResponse: + if isinstance(data, str): + data = data.encode() + headers = dict_copy(self.headers, headers) + attempts = 0 + if server_wait: + params['wait_end_of_query'] = '1' + # We can't actually read the progress headers, but we enable them so ClickHouse sends something + # to keep the connection alive when waiting for long-running queries and (2) to get summary information + # if not streaming + if self._send_progress: + params['send_progress_in_http_headers'] = '1' + if self._progress_interval: + params['http_headers_progress_interval_ms'] = self._progress_interval + final_params = dict_copy(self.params, params) + url = f'{self.url}?{urlencode(final_params)}' + kwargs = { + 'headers': headers, + 'timeout': self.timeout, + 'retries': self.http_retries, + 'preload_content': not stream + } + if self.server_host_name: + kwargs['assert_same_host'] = False + kwargs['headers'].update({'Host': self.server_host_name}) + if fields: + kwargs['fields'] = fields + else: + kwargs['body'] = data + check_conn_reset(self.http) + query_session = final_params.get('session_id') + while True: + attempts += 1 + if query_session: + if query_session == self._active_session: + raise ProgrammingError('Attempt to execute concurrent queries within the same session.' + + 'Please use a separate client instance per thread/process.') + # There is a race condition here when using multiprocessing -- in that case the server will + # throw an error instead, but in most cases this more helpful error will be thrown first + self._active_session = query_session + try: + response = self.http.request(method, url, **kwargs) + except HTTPError as ex: + if isinstance(ex.__context__, ConnectionResetError): + # The server closed the connection, probably because the Keep Alive has expired + # We should be safe to retry, as ClickHouse should not have processed anything on a connection + # that it killed. We also only retry this once, as multiple disconnects are unlikely to be + # related to the Keep Alive settings + if attempts == 1: + logger.debug('Retrying remotely closed connection') + continue + logger.warning('Unexpected Http Driver Exception') + raise OperationalError(f'Error {ex} executing HTTP request attempt {attempts} {self.url}') from ex + finally: + if query_session: + self._active_session = None # Make sure we always clear this + if 200 <= response.status < 300: + return response + if response.status in (429, 503, 504): + if attempts > retries: + self._error_handler(response, True) + logger.debug('Retrying requests with status code %d', response.status) + elif error_handler: + error_handler(response) + else: + self._error_handler(response) + + def ping(self): + """ + See BaseClient doc_string for this method + """ + try: + response = self.http.request('GET', f'{self.url}/ping', timeout=3) + return 200 <= response.status < 300 + except HTTPError: + logger.debug('ping failed', exc_info=True) + return False + + def raw_query(self, query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, fmt: str = None, + use_database: bool = True, external_data: Optional[ExternalData] = None) -> bytes: + """ + See BaseClient doc_string for this method + """ + final_query, bind_params = bind_query(query, parameters, self.server_tz) + if fmt: + final_query += f'\n FORMAT {fmt}' + params = self._validate_settings(settings or {}) + if use_database and self.database: + params['database'] = self.database + params.update(bind_params) + if external_data: + body = bytes() + params['query'] = final_query + params.update(external_data.query_params) + fields = external_data.form_data + else: + body = final_query + fields = None + return self._raw_request(body, params, fields=fields).data + + def close(self): + if self._owns_pool_manager: + self.http.clear() + all_managers.pop(self.http, None) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py new file mode 100644 index 0000000000..9bb8e26508 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py @@ -0,0 +1,226 @@ +import atexit +import http +import logging +import multiprocessing +import os +import sys +import socket +import time +from typing import Dict, Any, Optional + +import certifi +import lz4.frame +import urllib3 +import zstandard +from urllib3.poolmanager import PoolManager, ProxyManager +from urllib3.response import HTTPResponse + +from clickhouse_connect.driver.exceptions import ProgrammingError +from clickhouse_connect import common + +logger = logging.getLogger(__name__) + +# We disable this warning. Verify must explicitly set to false, so we assume the user knows what they're doing +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +# Increase this number just to be safe when ClickHouse is returning progress headers +http.client._MAXHEADERS = 10000 # pylint: disable=protected-access + +DEFAULT_KEEP_INTERVAL = 30 +DEFAULT_KEEP_COUNT = 3 +DEFAULT_KEEP_IDLE = 30 + +SOCKET_TCP = socket.IPPROTO_TCP + +core_socket_options = [ + (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), + (SOCKET_TCP, socket.TCP_NODELAY, 1), + (socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 256), + (socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 256) +] + +logging.getLogger('urllib3').setLevel(logging.WARNING) +_proxy_managers = {} +all_managers = {} + + +@atexit.register +def close_managers(): + for manager in all_managers: + manager.clear() + + +# pylint: disable=no-member,too-many-arguments,too-many-branches +def get_pool_manager_options(keep_interval: int = DEFAULT_KEEP_INTERVAL, + keep_count: int = DEFAULT_KEEP_COUNT, + keep_idle: int = DEFAULT_KEEP_IDLE, + ca_cert: str = None, + verify: bool = True, + client_cert: str = None, + client_cert_key: str = None, + **options) -> Dict[str, Any]: + socket_options = core_socket_options.copy() + if getattr(socket, 'TCP_KEEPINTVL', None) is not None: + socket_options.append((SOCKET_TCP, socket.TCP_KEEPINTVL, keep_interval)) + if getattr(socket, 'TCP_KEEPCNT', None) is not None: + socket_options.append((SOCKET_TCP, socket.TCP_KEEPCNT, keep_count)) + if getattr(socket, 'TCP_KEEPIDLE', None) is not None: + socket_options.append((SOCKET_TCP, socket.TCP_KEEPIDLE, keep_idle)) + if sys.platform == 'darwin': + socket_options.append((SOCKET_TCP, getattr(socket, 'TCP_KEEPALIVE', 0x10), keep_interval)) + options['maxsize'] = options.get('maxsize', 8) + options['retries'] = options.get('retries', 1) + if ca_cert == 'certifi': + ca_cert = certifi.where() + options['cert_reqs'] = 'CERT_REQUIRED' if verify else 'CERT_NONE' + if ca_cert: + options['ca_certs'] = ca_cert + if client_cert: + options['cert_file'] = client_cert + if client_cert_key: + options['key_file'] = client_cert_key + options['socket_options'] = socket_options + options['block'] = options.get('block', False) + return options + + +def get_pool_manager(keep_interval: int = DEFAULT_KEEP_INTERVAL, + keep_count: int = DEFAULT_KEEP_COUNT, + keep_idle: int = DEFAULT_KEEP_IDLE, + ca_cert: str = None, + verify: bool = True, + client_cert: str = None, + client_cert_key: str = None, + http_proxy: str = None, + https_proxy: str = None, + **options): + options = get_pool_manager_options(keep_interval, + keep_count, + keep_idle, + ca_cert, + verify, + client_cert, + client_cert_key, + **options) + if http_proxy: + if https_proxy: + raise ProgrammingError('Only one of http_proxy or https_proxy should be specified') + if not http_proxy.startswith('http'): + http_proxy = f'http://{http_proxy}' + manager = ProxyManager(http_proxy, **options) + elif https_proxy: + if not https_proxy.startswith('http'): + https_proxy = f'https://{https_proxy}' + manager = ProxyManager(https_proxy, **options) + else: + manager = PoolManager(**options) + all_managers[manager] = int(time.time()) + return manager + + +def check_conn_reset(manager: PoolManager): + reset_seconds = common.get_setting('max_connection_age') + if reset_seconds: + last_reset = all_managers.get(manager, 0) + now = int(time.time()) + if last_reset < now - reset_seconds: + logger.debug('connection reset') + manager.clear() + all_managers[manager] = now + + +def get_proxy_manager(host: str, http_proxy): + key = f'{host}__{http_proxy}' + if key in _proxy_managers: + return _proxy_managers[key] + proxy_manager = get_pool_manager(http_proxy=http_proxy) + _proxy_managers[key] = proxy_manager + return proxy_manager + + +def get_response_data(response: HTTPResponse) -> bytes: + encoding = response.headers.get('content-encoding', None) + if encoding == 'zstd': + try: + zstd_decom = zstandard.ZstdDecompressor() + return zstd_decom.stream_reader(response.data).read() + except zstandard.ZstdError: + pass + if encoding == 'lz4': + lz4_decom = lz4.frame.LZ4FrameDecompressor() + return lz4_decom.decompress(response.data, len(response.data)) + return response.data + + +def check_env_proxy(scheme: str, host: str, port: int) -> Optional[str]: + env_var = f'{scheme}_proxy'.lower() + proxy = os.environ.get(env_var) + if not proxy: + proxy = os.environ.get(env_var.upper()) + if not proxy: + return None + no_proxy = os.environ.get('no_proxy') + if not no_proxy: + no_proxy = os.environ.get('NO_PROXY') + if not no_proxy: + return proxy + if no_proxy == '*': + return None # Wildcard no proxy means don't actually proxy anything + host = host.lower() + for name in no_proxy.split(','): + name = name.strip() + if name: + name = name.lstrip('.').lower() + if name in (host, f'{host}:{port}'): + return None # Host or host/port matches + if host.endswith('.' + name): + return None # Domain matches + return proxy + + +_default_pool_manager = get_pool_manager() + + +def default_pool_manager(): + if multiprocessing.current_process().name == 'MainProcess': + return _default_pool_manager + # PoolManagers don't seem to be safe for some multiprocessing environments, always return a new one + return get_pool_manager() + + +class ResponseSource: + def __init__(self, response: HTTPResponse, chunk_size: int = 1024 * 1024): + self.response = response + compression = response.headers.get('content-encoding') + if compression == 'zstd': + zstd_decom = zstandard.ZstdDecompressor().decompressobj() + + def decompress(): + while True: + chunk = response.read(chunk_size, decode_content=False) + if not chunk: + break + yield zstd_decom.decompress(chunk) + + self.gen = decompress() + elif compression == 'lz4': + lz4_decom = lz4.frame.LZ4FrameDecompressor() + + def decompress(): + while lz4_decom.needs_input: + data = self.response.read(chunk_size, decode_content=False) + if lz4_decom.unused_data: + data = lz4_decom.unused_data + data + if not data: + return + chunk = lz4_decom.decompress(data) + if chunk: + yield chunk + + self.gen = decompress() + else: + self.gen = response.stream(amt=chunk_size, decode_content=True) + + def close(self): + self.response.drain_conn() + self.response.close() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py new file mode 100644 index 0000000000..c7861b3077 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/insert.py @@ -0,0 +1,199 @@ +import logging +from math import log +from typing import Iterable, Sequence, Optional, Any, Dict, NamedTuple, Generator, Union, TYPE_CHECKING + +from clickhouse_connect.driver.query import quote_identifier + +from clickhouse_connect.driver.ctypes import data_conv +from clickhouse_connect.driver.context import BaseQueryContext +from clickhouse_connect.driver.options import np, pd, pd_time_test +from clickhouse_connect.driver.exceptions import ProgrammingError + +if TYPE_CHECKING: + from clickhouse_connect.datatypes.base import ClickHouseType + +logger = logging.getLogger(__name__) +DEFAULT_BLOCK_BYTES = 1 << 21 # Try to generate blocks between 1MB and 2MB in raw size + + +class InsertBlock(NamedTuple): + prefix: bytes + column_count: int + row_count: int + column_names: Iterable[str] + column_types: Iterable['ClickHouseType'] + column_data: Iterable[Sequence[Any]] + + +# pylint: disable=too-many-instance-attributes +class InsertContext(BaseQueryContext): + """ + Reusable Argument/parameter object for inserts. + """ + + # pylint: disable=too-many-arguments + def __init__(self, + table: str, + column_names: Sequence[str], + column_types: Sequence['ClickHouseType'], + data: Any = None, + column_oriented: Optional[bool] = None, + settings: Optional[Dict[str, Any]] = None, + compression: Optional[Union[str, bool]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + block_size: Optional[int] = None): + super().__init__(settings, query_formats, column_formats) + self.table = table + self.column_names = column_names + self.column_types = column_types + self.column_oriented = False if column_oriented is None else column_oriented + self.compression = compression + self.req_block_size = block_size + self.block_row_count = DEFAULT_BLOCK_BYTES + self.data = data + self.insert_exception = None + + @property + def empty(self) -> bool: + return self._data is None + + @property + def data(self): + return self._raw_data + + @data.setter + def data(self, data: Any): + self._raw_data = data + self.current_block = 0 + self.current_row = 0 + self.row_count = 0 + self.column_count = 0 + self._data = None + if data is None or len(data) == 0: + return + if pd and isinstance(data, pd.DataFrame): + data = self._convert_pandas(data) + self.column_oriented = True + if np and isinstance(data, np.ndarray): + data = self._convert_numpy(data) + if self.column_oriented: + self._next_block_data = self._column_block_data + self._block_columns = data # [SliceView(column) for column in data] + self._block_rows = None + self.column_count = len(data) + self.row_count = len(data[0]) + else: + self._next_block_data = self._row_block_data + self._block_rows = data + self._block_columns = None + self.row_count = len(data) + self.column_count = len(data[0]) + if self.row_count and self.column_count: + if self.column_count != len(self.column_names): + raise ProgrammingError('Insert data column count does not match column names') + self._data = data + self.block_row_count = self._calc_block_size() + + def _calc_block_size(self) -> int: + if self.req_block_size: + return self.req_block_size + row_size = 0 + sample_size = min((log(self.row_count) + 1) * 2, 64) + sample_freq = max(1, int(self.row_count / sample_size)) + for i, d_type in enumerate(self.column_types): + if d_type.byte_size: + row_size += d_type.byte_size + continue + if self.column_oriented: + col_data = self._data[i] + if sample_freq == 1: + d_size = d_type.data_size(col_data) + else: + sample = [col_data[j] for j in range(0, self.row_count, sample_freq)] + d_size = d_type.data_size(sample) + else: + data = self._data + sample = [data[j][i] for j in range(0, self.row_count, sample_freq)] + d_size = d_type.data_size(sample) + row_size += d_size + return 1 << (21 - int(log(row_size, 2))) + + def next_block(self) -> Generator[InsertBlock, None, None]: + while True: + block_end = min(self.current_row + self.block_row_count, self.row_count) + row_count = block_end - self.current_row + if row_count <= 0: + return + if self.current_block == 0: + cols = f" ({', '.join([quote_identifier(x) for x in self.column_names])})" + prefix = f'INSERT INTO {self.table}{cols} FORMAT Native\n'.encode() + else: + prefix = bytes() + self.current_block += 1 + data = self._next_block_data(self.current_row, block_end) + yield InsertBlock(prefix, self.column_count, row_count, self.column_names, self.column_types, data) + self.current_row = block_end + + def _column_block_data(self, block_start, block_end): + if block_start == 0 and self.row_count <= block_end: + return self._block_columns # Optimization if we don't need to break up the block + return [col[block_start: block_end] for col in self._block_columns] + + def _row_block_data(self, block_start, block_end): + return data_conv.pivot(self._block_rows, block_start, block_end) + + def _convert_pandas(self, df): + data = [] + for df_col_name, col_name, ch_type in zip(df.columns, self.column_names, self.column_types): + df_col = df[df_col_name] + d_type = str(df_col.dtype) + if ch_type.python_type == int: + if 'float' in d_type: + df_col = df_col.round().astype(ch_type.base_type, copy=False) + else: + df_col = df_col.astype(ch_type.base_type, copy=False) + elif 'datetime' in ch_type.np_type and (pd_time_test(df_col) or 'datetime64[ns' in d_type): + div = ch_type.nano_divisor + data.append([None if pd.isnull(x) else x.value // div for x in df_col]) + self.column_formats[col_name] = 'int' + continue + if ch_type.nullable: + if d_type == 'object': + # This is ugly, but the multiple replaces seem required as a result of this bug: + # https://github.com/pandas-dev/pandas/issues/29024 + df_col = df_col.replace({pd.NaT: None}).replace({np.nan: None}) + elif 'Float' in ch_type.base_type: + # This seems to be the only way to convert any null looking things to nan + df_col = df_col.astype(ch_type.np_type) + else: + df_col = df_col.replace({np.nan: None}) + data.append(df_col.to_numpy(copy=False)) + return data + + def _convert_numpy(self, np_array): + if np_array.dtype.names is None: + if 'date' in str(np_array.dtype): + for col_name, col_type in zip(self.column_names, self.column_types): + if 'date' in col_type.np_type: + self.column_formats[col_name] = 'int' + return np_array.astype('int').tolist() + for col_type in self.column_types: + if col_type.byte_size == 0 or col_type.byte_size > np_array.dtype.itemsize: + return np_array.tolist() + return np_array + + if set(self.column_names).issubset(set(np_array.dtype.names)): + data = [np_array[col_name] for col_name in self.column_names] + else: + # Column names don't match, so we have to assume they are in order + data = [np_array[col_name] for col_name in np_array.dtype.names] + for ix, (col_name, col_type) in enumerate(zip(self.column_names, self.column_types)): + d_type = data[ix].dtype + if 'date' in str(d_type) and 'date' in col_type.np_type: + self.column_formats[col_name] = 'int' + data[ix] = data[ix].astype(int).tolist() + elif col_type.byte_size == 0 or col_type.byte_size > d_type.itemsize: + data[ix] = data[ix].tolist() + self.column_oriented = True + return data diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/models.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/models.py new file mode 100644 index 0000000000..054c7b686a --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/models.py @@ -0,0 +1,37 @@ +from typing import NamedTuple + +from clickhouse_connect.datatypes.registry import get_from_name + + +class ColumnDef(NamedTuple): + """ + ClickHouse column definition from DESCRIBE TABLE command + """ + name: str + type: str + default_type: str + default_expression: str + comment: str + codec_expression: str + ttl_expression: str + + @property + def ch_type(self): + return get_from_name(self.type) + + +class SettingDef(NamedTuple): + """ + ClickHouse setting definition from system.settings table + """ + name: str + value: str + readonly: int + + +class SettingStatus(NamedTuple): + """ + Get the setting "status" from a ClickHouse server setting + """ + is_set: bool + is_writable: bool diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/npconv.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/npconv.py new file mode 100644 index 0000000000..df99550d34 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/npconv.py @@ -0,0 +1,9 @@ +from clickhouse_connect.driver.options import np + +from clickhouse_connect.driver.types import ByteSource + + +def read_numpy_array(source: ByteSource, np_type: str, num_rows: int): + dtype = np.dtype(np_type) + buffer = source.read_bytes(dtype.itemsize * num_rows) + return np.frombuffer(buffer, dtype, num_rows) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/npquery.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/npquery.py new file mode 100644 index 0000000000..1c063e4082 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/npquery.py @@ -0,0 +1,132 @@ +import logging +from typing import Generator, Sequence, Tuple + +from clickhouse_connect.driver.common import empty_gen, StreamContext +from clickhouse_connect.driver.exceptions import StreamClosedError +from clickhouse_connect.driver.types import Closable +from clickhouse_connect.driver.options import np, pd + +logger = logging.getLogger(__name__) + + +# pylint: disable=too-many-instance-attributes +class NumpyResult(Closable): + def __init__(self, + block_gen: Generator[Sequence, None, None] = None, + column_names: Tuple = (), + column_types: Tuple = (), + d_types: Sequence = (), + source: Closable = None): + self.column_names = column_names + self.column_types = column_types + self.np_types = d_types + self.source = source + self.query_id = '' + self.summary = {} + self._block_gen = block_gen or empty_gen() + self._numpy_result = None + self._df_result = None + + def _np_stream(self) -> Generator: + if self._block_gen is None: + raise StreamClosedError + + block_gen = self._block_gen + self._block_gen = None + if not self.np_types: + return block_gen + + d_types = self.np_types + first_type = d_types[0] + if first_type != np.object_ and all(np.dtype(np_type) == first_type for np_type in d_types): + self.np_types = first_type + + def numpy_blocks(): + for block in block_gen: + yield np.array(block, first_type).transpose() + else: + if any(x == np.object_ for x in d_types): + self.np_types = [np.object_] * len(self.np_types) + self.np_types = np.dtype(list(zip(self.column_names, d_types))) + + def numpy_blocks(): + for block in block_gen: + np_array = np.empty(len(block[0]), dtype=self.np_types) + for col_name, data in zip(self.column_names, block): + np_array[col_name] = data + yield np_array + + return numpy_blocks() + + def _df_stream(self) -> Generator: + if self._block_gen is None: + raise StreamClosedError + block_gen = self._block_gen + + def pd_blocks(): + for block in block_gen: + yield pd.DataFrame(dict(zip(self.column_names, block))) + + self._block_gen = None + return pd_blocks() + + def close_numpy(self): + if not self._block_gen: + raise StreamClosedError + chunk_size = 4 + pieces = [] + blocks = [] + for block in self._np_stream(): + blocks.append(block) + if len(blocks) == chunk_size: + pieces.append(np.concatenate(blocks, dtype=self.np_types)) + chunk_size *= 2 + blocks = [] + pieces.extend(blocks) + if len(pieces) > 1: + self._numpy_result = np.concatenate(pieces, dtype=self.np_types) + elif len(pieces) == 1: + self._numpy_result = pieces[0] + else: + self._numpy_result = np.empty((0,)) + self.close() + return self + + def close_df(self): + pieces = list(self._df_stream()) + if len(pieces) > 1: + self._df_result = pd.concat(pieces, ignore_index=True) + elif len(pieces) == 1: + self._df_result = pieces[0] + else: + self._df_result = pd.DataFrame() + self.close() + return self + + @property + def np_result(self): + if self._numpy_result is None: + self.close_numpy() + return self._numpy_result + + @property + def df_result(self): + if self._df_result is None: + self.close_df() + return self._df_result + + @property + def np_stream(self) -> StreamContext: + return StreamContext(self, self._np_stream()) + + @property + def df_stream(self) -> StreamContext: + return StreamContext(self, self._df_stream()) + + def close(self): + if self._block_gen is not None: + self._block_gen.close() + self._block_gen = None + if self.source: + self.source.close() + self.source = None diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/options.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/options.py new file mode 100644 index 0000000000..4cec665c03 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/options.py @@ -0,0 +1,52 @@ +from clickhouse_connect.driver.exceptions import NotSupportedError + +pd_time_test = None +pd_extended_dtypes = False + +try: + import numpy as np +except ImportError: + np = None + +try: + import pandas as pd + pd_extended_dtypes = not pd.__version__.startswith('0') + try: + from pandas.core.dtypes.common import is_datetime64_dtype + from pandas.core.dtypes.common import is_timedelta64_dtype + + def combined_test(arr_or_dtype): + return is_datetime64_dtype(arr_or_dtype) or is_timedelta64_dtype(arr_or_dtype) + + pd_time_test = combined_test + except ImportError: + try: + from pandas.core.dtypes.common import is_datetime_or_timedelta_dtype + pd_time_test = is_datetime_or_timedelta_dtype + except ImportError as ex: + raise NotSupportedError('pandas version does not contain expected test for temporal types') from ex +except ImportError: + pd = None + +try: + import pyarrow as arrow +except ImportError: + arrow = None + + +def check_numpy(): + if np: + return np + raise NotSupportedError('Numpy package is not installed') + + +def check_pandas(): + if pd: + return pd + raise NotSupportedError('Pandas package is not installed') + + +def check_arrow(): + if arrow: + return arrow + raise NotSupportedError('PyArrow package is not installed') diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py new file mode 100644 index 0000000000..a158e7f999 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/parser.py @@ -0,0 +1,166 @@ +from typing import Union, Tuple + +from clickhouse_connect.driver.common import unescape_identifier + + +# pylint: disable=too-many-branches +def parse_callable(expr) -> Tuple[str, Tuple[Union[str, int], ...], str]: + """ + Parses a single level ClickHouse optionally 'callable' function/identifier. The identifier is returned as the + first value in the response tuple. If the expression is callable -- i.e. an identifier followed by 0 or more + arguments in parentheses, the second returned value is a tuple of the comma separated arguments. The third and + final tuple value is any text remaining after the initial expression for further parsing/processing. + + Examples: + "Tuple(String, Enum('one' = 1, 'two' = 2))" will return "Tuple", ("String", "Enum('one' = 1,'two' = 2)"), "" + "MergeTree() PARTITION BY key" will return "MergeTree", (), "PARTITION BY key" + + :param expr: ClickHouse DDL or Column Name expression + :return: Tuple of the identifier, a tuple of arguments, and remaining text + """ + expr = expr.strip() + pos = expr.find('(') + space = expr.find(' ') + if pos == -1 and space == -1: + return expr, (), '' + if space != -1 and (pos == -1 or space < pos): + return expr[:space], (), expr[space:].strip() + name = expr[:pos] + pos += 1 # Skip first paren + values = [] + value = '' + in_str = False + level = 0 + + def add_value(): + try: + values.append(int(value)) + except ValueError: + values.append(value) + + while True: + char = expr[pos] + pos += 1 + if in_str: + value += char + if char == "'": + in_str = False + elif char == '\\' and expr[pos] == "'" and expr[pos:pos + 4] != "' = " and expr[pos:pos + 2] != "')": + value += expr[pos] + pos += 1 + else: + if level == 0: + if char == ' ': + space = pos + temp_char = expr[space] + while temp_char == ' ': + space += 1 + temp_char = expr[space] + if not value or temp_char in "()',=><0": + char = temp_char + pos = space + 1 + if char == ',': + add_value() + value = '' + continue + if char == ')': + break + if char == "'" and (not value or 'Enum' in value): + in_str = True + elif char == '(': + level += 1 + elif char == ')' and level: + level -= 1 + value += char + if value != '': + add_value() + return name, tuple(values), expr[pos:].strip() + + +def parse_enum(expr) -> Tuple[Tuple[str], Tuple[int]]: + """ + Parse a ClickHouse enum definition expression of the form ('key1' = 1, 'key2' = 2) + :param expr: ClickHouse enum expression/arguments + :return: Parallel tuples of string enum keys and integer enum values + """ + keys = [] + values = [] + pos = expr.find('(') + 1 + in_key = False + key = [] + value = [] + while True: + char = expr[pos] + pos += 1 + if in_key: + if char == "'": + keys.append(''.join(key)) + key = [] + in_key = False + elif char == '\\' and expr[pos] == "'" and expr[pos:pos + 4] != "' = " and expr[pos:] != "')": + key.append(expr[pos]) + pos += 1 + else: + key.append(char) + elif char not in (' ', '='): + if char == ',': + values.append(int(''.join(value))) + value = [] + elif char == ')': + values.append(int(''.join(value))) + break + elif char == "'" and not value: + in_key = True + else: + value.append(char) + values, keys = zip(*sorted(zip(values, keys))) + return tuple(keys), tuple(values) + + +def parse_columns(expr: str): + """ + Parse a ClickHouse column list of the form (col1 String, col2 Array(Tuple(String, Int32))). This also handles + unnamed columns (such as Tuple definitions). Mixed named and unnamed columns are not currently supported. + :param expr: ClickHouse enum expression/arguments + :return: Parallel tuples of column types and column types (strings) + """ + names = [] + columns = [] + pos = 1 + named = False + level = 0 + label = '' + in_str = False + while True: + char = expr[pos] + pos += 1 + if in_str: + if "'" == char: + in_str = False + elif char == '\\' and expr[pos] == "'" and expr[pos:pos + 4] != "' = " and expr[pos:pos + 2] != "')": + label += expr[pos] + pos += 1 + else: + if level == 0: + if char == ' ': + if label and not named: + names.append(unescape_identifier(label)) + label = '' + named = True + char = '' + elif char == ',': + columns.append(label) + named = False + label = '' + continue + elif char == ')': + columns.append(label) + break + if char == "'" and (not label or 'Enum' in label): + in_str = True + elif char == '(': + level += 1 + elif char == ')': + level -= 1 + label += char + return tuple(names), tuple(columns) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py new file mode 100644 index 0000000000..0b5086ae11 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/query.py @@ -0,0 +1,496 @@ +import ipaddress +import logging +import re +import uuid +import pytz + +from enum import Enum +from typing import Any, Tuple, Dict, Sequence, Optional, Union, Generator +from datetime import date, datetime, tzinfo + +from pytz.exceptions import UnknownTimeZoneError + +from clickhouse_connect import common +from clickhouse_connect.driver.common import dict_copy, empty_gen, StreamContext +from clickhouse_connect.driver.external import ExternalData +from clickhouse_connect.driver.types import Matrix, Closable +from clickhouse_connect.json_impl import any_to_json +from clickhouse_connect.driver.exceptions import StreamClosedError, ProgrammingError +from clickhouse_connect.driver.options import check_arrow, pd_extended_dtypes +from clickhouse_connect.driver.context import BaseQueryContext + +logger = logging.getLogger(__name__) +commands = 'CREATE|ALTER|SYSTEM|GRANT|REVOKE|CHECK|DETACH|DROP|DELETE|KILL|' + \ + 'OPTIMIZE|SET|RENAME|TRUNCATE|USE' + +limit_re = re.compile(r'\s+LIMIT($|\s)', re.IGNORECASE) +select_re = re.compile(r'(^|\s)SELECT\s', re.IGNORECASE) +insert_re = re.compile(r'(^|\s)INSERT\s*INTO', re.IGNORECASE) +command_re = re.compile(r'(^\s*)(' + commands + r')\s', re.IGNORECASE) +external_bind_re = re.compile(r'{.+:.+}') + + +# pylint: disable=too-many-instance-attributes +class QueryContext(BaseQueryContext): + """ + Argument/parameter object for queries. This context is used to set thread/query specific formats + """ + + # pylint: disable=duplicate-code,too-many-arguments,too-many-locals + def __init__(self, + query: str = '', + parameters: Optional[Dict[str, Any]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + server_tz: tzinfo = pytz.UTC, + use_none: Optional[bool] = None, + column_oriented: Optional[bool] = None, + use_numpy: Optional[bool] = None, + max_str_len: Optional[int] = 0, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + use_extended_dtypes: Optional[bool] = None, + as_pandas: bool = False, + streaming: bool = False, + apply_server_tz: bool = False, + external_data: Optional[ExternalData] = None): + """ + Initializes various configuration settings for the query context + + :param query: Query string with Python style format value replacements + :param parameters: Optional dictionary of substitution values + :param settings: Optional ClickHouse settings for the query + :param query_formats: Optional dictionary of query formats with the key of a ClickHouse type name + (with * wildcards) and a value of valid query formats for those types. + The value 'encoding' can be sent to change the expected encoding for this query, with a value of + the desired encoding such as `latin-1` + :param column_formats: Optional dictionary of column specific formats. The key is the column name, + The value is either the format for the data column (such as 'string' for a UUID column) or a + second level "format" dictionary of a ClickHouse type name and a value of query formats. This + secondary dictionary can be used for nested column types such as Tuples or Maps + :param encoding: Optional string encoding for this query, such as 'latin-1' + :param column_formats: Optional dictionary + :param use_none: Use a Python None for ClickHouse NULL values in nullable columns. Otherwise the default + value of the column (such as 0 for numbers) will be returned in the result_set + :param max_str_len Limit returned ClickHouse String values to this length, which allows a Numpy + structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for + String columns will always be object arrays + :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects). + Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime + objects with the selected timezone + :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to + tzinfo objects). The timezone will be applied to datetime objects returned in the query + """ + super().__init__(settings, + query_formats, + column_formats, + encoding, + use_extended_dtypes if use_extended_dtypes is not None else False, + use_numpy if use_numpy is not None else False) + self.query = query + self.parameters = parameters or {} + self.use_none = True if use_none is None else use_none + self.column_oriented = False if column_oriented is None else column_oriented + self.use_numpy = use_numpy + self.max_str_len = 0 if max_str_len is None else max_str_len + self.server_tz = server_tz + self.apply_server_tz = apply_server_tz + self.external_data = external_data + if isinstance(query_tz, str): + try: + query_tz = pytz.timezone(query_tz) + except UnknownTimeZoneError as ex: + raise ProgrammingError(f'query_tz {query_tz} is not recognized') from ex + self.query_tz = query_tz + if column_tzs is not None: + for col_name, timezone in column_tzs.items(): + if isinstance(timezone, str): + try: + timezone = pytz.timezone(timezone) + column_tzs[col_name] = timezone + except UnknownTimeZoneError as ex: + raise ProgrammingError(f'column_tz {timezone} is not recognized') from ex + self.column_tzs = column_tzs + self.column_tz = None + self.response_tz = None + self.block_info = False + self.as_pandas = as_pandas + self.use_pandas_na = as_pandas and pd_extended_dtypes + self.streaming = streaming + self._update_query() + + @property + def is_select(self) -> bool: + return select_re.search(self.uncommented_query) is not None + + @property + def has_limit(self) -> bool: + return limit_re.search(self.uncommented_query) is not None + + @property + def is_insert(self) -> bool: + return insert_re.search(self.uncommented_query) is not None + + @property + def is_command(self) -> bool: + return command_re.search(self.uncommented_query) is not None + + def set_parameters(self, parameters: Dict[str, Any]): + self.parameters = parameters + self._update_query() + + def set_parameter(self, key: str, value: Any): + if not self.parameters: + self.parameters = {} + self.parameters[key] = value + self._update_query() + + def set_response_tz(self, response_tz: tzinfo): + self.response_tz = response_tz + + def start_column(self, name: str): + super().start_column(name) + if self.column_tzs and name in self.column_tzs: + self.column_tz = self.column_tzs[name] + else: + self.column_tz = None + + def active_tz(self, datatype_tz: Optional[tzinfo]): + if self.column_tz: + active_tz = self.column_tz + elif datatype_tz: + active_tz = datatype_tz + elif self.query_tz: + active_tz = self.query_tz + elif self.response_tz: + active_tz = self.response_tz + elif self.apply_server_tz: + active_tz = self.server_tz + else: + active_tz = self.local_tz + # Special case where if everything is UTC, including the local timezone, we use naive timezones + # for performance reasons + if active_tz == pytz.UTC and active_tz.utcoffset(datetime.now()) == self.local_tz.utcoffset(datetime.now()): + return None + return active_tz + + def updated_copy(self, + query: Optional[str] = None, + parameters: Optional[Dict[str, Any]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + server_tz: Optional[tzinfo] = None, + use_none: Optional[bool] = None, + column_oriented: Optional[bool] = None, + use_numpy: Optional[bool] = None, + max_str_len: Optional[int] = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + use_extended_dtypes: Optional[bool] = None, + as_pandas: bool = False, + streaming: bool = False, + external_data: Optional[ExternalData] = None) -> 'QueryContext': + """ + Creates Query context copy with parameters overridden/updated as appropriate. + """ + return QueryContext(query or self.query, + dict_copy(self.parameters, parameters), + dict_copy(self.settings, settings), + dict_copy(self.query_formats, query_formats), + dict_copy(self.column_formats, column_formats), + encoding if encoding else self.encoding, + server_tz if server_tz else self.server_tz, + self.use_none if use_none is None else use_none, + self.column_oriented if column_oriented is None else column_oriented, + self.use_numpy if use_numpy is None else use_numpy, + self.max_str_len if max_str_len is None else max_str_len, + self.query_tz if query_tz is None else query_tz, + self.column_tzs if column_tzs is None else column_tzs, + self.use_extended_dtypes if use_extended_dtypes is None else use_extended_dtypes, + as_pandas, + streaming, + self.apply_server_tz, + self.external_data if external_data is None else external_data) + + def _update_query(self): + self.final_query, self.bind_params = bind_query(self.query, self.parameters, self.server_tz) + self.uncommented_query = remove_sql_comments(self.final_query) + + +class QueryResult(Closable): + """ + Wrapper class for query return values and metadata + """ + + # pylint: disable=too-many-arguments + def __init__(self, + result_set: Matrix = None, + block_gen: Generator[Matrix, None, None] = None, + column_names: Tuple = (), + column_types: Tuple = (), + column_oriented: bool = False, + source: Closable = None, + query_id: str = None, + summary: Dict[str, Any] = None): + self._result_rows = result_set + self._result_columns = None + self._block_gen = block_gen or empty_gen() + self._in_context = False + self._query_id = query_id + self.column_names = column_names + self.column_types = column_types + self.column_oriented = column_oriented + self.source = source + self.summary = {} if summary is None else summary + + @property + def result_set(self) -> Matrix: + if self.column_oriented: + return self.result_columns + return self.result_rows + + @property + def result_columns(self) -> Matrix: + if self._result_columns is None: + result = [[] for _ in range(len(self.column_names))] + with self.column_block_stream as stream: + for block in stream: + for base, added in zip(result, block): + base.extend(added) + self._result_columns = result + return self._result_columns + + @property + def result_rows(self) -> Matrix: + if self._result_rows is None: + result = [] + with self.row_block_stream as stream: + for block in stream: + result.extend(block) + self._result_rows = result + return self._result_rows + + @property + def query_id(self) -> str: + query_id = self.summary.get('query_id') + if query_id: + return query_id + return self._query_id + + def _column_block_stream(self): + if self._block_gen is None: + raise StreamClosedError + block_stream = self._block_gen + self._block_gen = None + return block_stream + + def _row_block_stream(self): + for block in self._column_block_stream(): + yield list(zip(*block)) + + @property + def column_block_stream(self) -> StreamContext: + return StreamContext(self, self._column_block_stream()) + + @property + def row_block_stream(self): + return StreamContext(self, self._row_block_stream()) + + @property + def rows_stream(self) -> StreamContext: + def stream(): + for block in self._row_block_stream(): + for row in block: + yield row + + return StreamContext(self, stream()) + + def named_results(self) -> Generator[dict, None, None]: + for row in zip(*self.result_set) if self.column_oriented else self.result_set: + yield dict(zip(self.column_names, row)) + + @property + def row_count(self) -> int: + if self.column_oriented: + return 0 if len(self.result_set) == 0 else len(self.result_set[0]) + return len(self.result_set) + + @property + def first_item(self): + if self.column_oriented: + return {name: col[0] for name, col in zip(self.column_names, self.result_set)} + return dict(zip(self.column_names, self.result_set[0])) + + @property + def first_row(self): + if self.column_oriented: + return [col[0] for col in self.result_set] + return self.result_set[0] + + def close(self): + if self.source: + self.source.close() + self.source = None + if self._block_gen is not None: + self._block_gen.close() + self._block_gen = None + + +BS = '\\' +must_escape = (BS, '\'') + + +def quote_identifier(identifier: str): + first_char = identifier[0] + if first_char in ('`', '"') and identifier[-1] == first_char: + # Identifier is already quoted, assume that it's valid + return identifier + return f'`{identifier}`' + + +def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], + server_tz: Optional[tzinfo] = None) -> str: + if not parameters: + return query + if hasattr(parameters, 'items'): + return query % {k: format_query_value(v, server_tz) for k, v in parameters.items()} + return query % tuple(format_query_value(v) for v in parameters) + + +def bind_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], + server_tz: Optional[tzinfo] = None) -> Tuple[str, Dict[str, str]]: + if not parameters: + return query, {} + if external_bind_re.search(query) is None: + return finalize_query(query, parameters, server_tz), {} + return query, {f'param_{k}': format_bind_value(v, server_tz) for k, v in parameters.items()} + + +def format_str(value: str): + return f"'{escape_str(value)}'" + + +def escape_str(value: str): + return ''.join(f'{BS}{c}' if c in must_escape else c for c in value) + + +# pylint: disable=too-many-return-statements +def format_query_value(value: Any, server_tz: tzinfo = pytz.UTC): + """ + Format Python values in a ClickHouse query + :param value: Python object + :param server_tz: Server timezone for adjusting datetime values + :return: Literal string for python value + """ + if value is None: + return 'NULL' + if isinstance(value, str): + return format_str(value) + if isinstance(value, datetime): + if value.tzinfo is None: + value = value.replace(tzinfo=server_tz) + return f"'{value.strftime('%Y-%m-%d %H:%M:%S')}'" + if isinstance(value, date): + return f"'{value.isoformat()}'" + if isinstance(value, list): + return f"[{', '.join(format_query_value(x, server_tz) for x in value)}]" + if isinstance(value, tuple): + return f"({', '.join(format_query_value(x, server_tz) for x in value)})" + if isinstance(value, dict): + if common.get_setting('dict_parameter_format') == 'json': + return format_str(any_to_json(value).decode()) + pairs = [format_query_value(k, server_tz) + ':' + format_query_value(v, server_tz) + for k, v in value.items()] + return f"{{{', '.join(pairs)}}}" + if isinstance(value, Enum): + return format_query_value(value.value, server_tz) + if isinstance(value, (uuid.UUID, ipaddress.IPv4Address, ipaddress.IPv6Address)): + return f"'{value}'" + return str(value) + + +# pylint: disable=too-many-branches +def format_bind_value(value: Any, server_tz: tzinfo = pytz.UTC, top_level: bool = True): + """ + Format Python values in a ClickHouse query + :param value: Python object + :param server_tz: Server timezone for adjusting datetime values + :param top_level: Flag for top level for nested structures + :return: Literal string for python value + """ + + def recurse(x): + return format_bind_value(x, server_tz, False) + + if value is None: + return '\\N' + if isinstance(value, str): + if top_level: + # At the top levels, strings must not be surrounded by quotes + return escape_str(value) + return format_str(value) + if isinstance(value, datetime): + if value.tzinfo is None: + value = value.replace(tzinfo=server_tz) + val = value.strftime('%Y-%m-%d %H:%M:%S') + if top_level: + return val + return f"'{val}'" + if isinstance(value, date): + if top_level: + return value.isoformat() + return f"'{value.isoformat()}'" + if isinstance(value, list): + return f"[{', '.join(recurse(x) for x in value)}]" + if isinstance(value, tuple): + return f"({', '.join(recurse(x) for x in value)})" + if isinstance(value, dict): + if common.get_setting('dict_parameter_format') == 'json': + return any_to_json(value).decode() + pairs = [recurse(k) + ':' + recurse(v) + for k, v in value.items()] + return f"{{{', '.join(pairs)}}}" + if isinstance(value, Enum): + return recurse(value.value) + return str(value) + + +comment_re = re.compile(r"(\".*?\"|\'.*?\')|(/\*.*?\*/|(--\s)[^\n]*$)", re.MULTILINE | re.DOTALL) + + +def remove_sql_comments(sql: str) -> str: + """ + Remove SQL comments. This is useful to determine the type of SQL query, such as SELECT or INSERT, but we + don't fully trust it to correctly ignore weird quoted strings, and other edge cases, so we always pass the + original SQL to ClickHouse (which uses a full-fledged AST/ token parser) + :param sql: SQL query + :return: SQL Query without SQL comments + """ + + def replacer(match): + # if the 2nd group (capturing comments) is not None, it means we have captured a + # non-quoted, actual comment string, so return nothing to remove the comment + if match.group(2): + return '' + # Otherwise we've actually captured a quoted string, so return it + return match.group(1) + + return comment_re.sub(replacer, sql) + + +def to_arrow(content: bytes): + pyarrow = check_arrow() + reader = pyarrow.ipc.RecordBatchFileReader(content) + return reader.read_all() + + +def arrow_buffer(table) -> Tuple[Sequence[str], bytes]: + pyarrow = check_arrow() + sink = pyarrow.BufferOutputStream() + with pyarrow.RecordBatchFileWriter(sink, table.schema) as writer: + writer.write(table) + return table.schema.names, sink.getvalue() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/summary.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/summary.py new file mode 100644 index 0000000000..ef152cad76 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/summary.py @@ -0,0 +1,39 @@ +from typing import Optional + +from clickhouse_connect.datatypes.registry import get_from_name + +from clickhouse_connect.driver.query import QueryResult + + +class QuerySummary: + summary = {} + + def __init__(self, summary: Optional[dict] = None): + if summary is not None: + self.summary = summary + + @property + def written_rows(self) -> int: + return int(self.summary.get('written_rows', 0)) + + def written_bytes(self) -> int: + return int(self.summary.get('written_bytes', 0)) + + def query_id(self) -> str: + return self.summary.get('query_id', '') + + def as_query_result(self) -> QueryResult: + data = [] + column_names = [] + column_types = [] + str_type = get_from_name('String') + int_type = get_from_name('Int64') + for key, value in self.summary.items(): + column_names.append(key) + if value.isnumeric(): + data.append(int(value)) + column_types.append(int_type) + else: + data.append(value) + column_types.append(str_type) + return QueryResult([data], column_names=tuple(column_names), column_types=tuple(column_types)) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py new file mode 100644 index 0000000000..420686cd64 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tools.py @@ -0,0 +1,28 @@ +from typing import Optional, Sequence, Dict, Any + +from clickhouse_connect.driver import Client +from clickhouse_connect.driver.summary import QuerySummary +from clickhouse_connect.driver.query import quote_identifier + + +def insert_file(client: Client, + table: str, + file_path: str, + fmt: Optional[str] = None, + column_names: Optional[Sequence[str]] = None, + database: Optional[str] = None, + settings: Optional[Dict[str, Any]] = None, + compression: Optional[str] = None) -> QuerySummary: + full_table = f'{quote_identifier(database)}.{quote_identifier(table)}' if database else quote_identifier(table) + if not fmt: + fmt = 'CSV' if column_names else 'CSVWithNames' + if compression is None: + if file_path.endswith('.gzip') or file_path.endswith('.gz'): + compression = 'gzip' + with open(file_path, 'rb') as file: + return client.raw_insert(full_table, + column_names=column_names, + insert_block=file, + fmt=fmt, + settings=settings, + compression=compression) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py new file mode 100644 index 0000000000..e781f63179 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/transform.py @@ -0,0 +1,118 @@ +import logging +from typing import Union + +from clickhouse_connect.datatypes import registry +from clickhouse_connect.driver.common import write_leb128 +from clickhouse_connect.driver.exceptions import StreamCompleteException, StreamFailureError +from clickhouse_connect.driver.insert import InsertContext +from clickhouse_connect.driver.npquery import NumpyResult +from clickhouse_connect.driver.query import QueryResult, QueryContext +from clickhouse_connect.driver.types import ByteSource +from clickhouse_connect.driver.compression import get_compressor + +_EMPTY_CTX = QueryContext() + +logger = logging.getLogger(__name__) + + +class NativeTransform: + # pylint: disable=too-many-locals + @staticmethod + def parse_response(source: ByteSource, context: QueryContext = _EMPTY_CTX) -> Union[NumpyResult, QueryResult]: + names = [] + col_types = [] + block_num = 0 + + def get_block(): + nonlocal block_num + result_block = [] + try: + try: + if context.block_info: + source.read_bytes(8) + num_cols = source.read_leb128() + except StreamCompleteException: + return None + num_rows = source.read_leb128() + for col_num in range(num_cols): + name = source.read_leb128_str() + type_name = source.read_leb128_str() + if block_num == 0: + names.append(name) + col_type = registry.get_from_name(type_name) + col_types.append(col_type) + else: + col_type = col_types[col_num] + if num_rows == 0: + result_block.append(tuple()) + else: + context.start_column(name) + column = col_type.read_column(source, num_rows, context) + result_block.append(column) + except Exception as ex: + source.close() + if isinstance(ex, StreamCompleteException): + # We ran out of data before it was expected, this could be ClickHouse reporting an error + # in the response + message = source.last_message + if len(message) > 1024: + message = message[-1024:] + error_start = message.find('Code: ') + if error_start != -1: + message = message[error_start:] + raise StreamFailureError(message) from None + raise + block_num += 1 + return result_block + + first_block = get_block() + if first_block is None: + return NumpyResult() if context.use_numpy else QueryResult([]) + + def gen(): + yield first_block + while True: + next_block = get_block() + if next_block is None: + return + yield next_block + + if context.use_numpy: + res_types = [col.dtype if hasattr(col, 'dtype') else 'O' for col in first_block] + return NumpyResult(gen(), tuple(names), tuple(col_types), res_types, source) + return QueryResult(None, gen(), tuple(names), tuple(col_types), context.column_oriented, source) + + @staticmethod + def build_insert(context: InsertContext): + compressor = get_compressor(context.compression) + + def chunk_gen(): + for block in context.next_block(): + output = bytearray() + output += block.prefix + write_leb128(block.column_count, output) + write_leb128(block.row_count, output) + for col_name, col_type, data in zip(block.column_names, block.column_types, block.column_data): + write_leb128(len(col_name), output) + output += col_name.encode() + write_leb128(len(col_type.name), output) + output += col_type.name.encode() + context.start_column(col_name) + try: + col_type.write_column(data, output, context) + except Exception as ex: # pylint: disable=broad-except + # This is hideous, but some low level serializations can fail while streaming + # the insert if the user has included bad data in the column. We need to ensure that the + # insert fails (using garbage data) to avoid a partial insert, and use the context to + # propagate the correct exception to the user + logger.error('Error serializing column `%s` into into data type `%s`', + col_name, col_type.name, exc_info=True) + context.insert_exception = ex + yield 'INTERNAL EXCEPTION WHILE SERIALIZING'.encode() + return + yield compressor.compress_block(output) + footer = compressor.flush() + if footer: + yield footer + + return chunk_gen() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/types.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/types.py new file mode 100644 index 0000000000..015e162fbe --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/types.py @@ -0,0 +1,50 @@ +from abc import ABC, abstractmethod +from typing import Sequence, Any + +Matrix = Sequence[Sequence[Any]] + + +class Closable(ABC): + @abstractmethod + def close(self): + pass + + +class ByteSource(Closable): + last_message = None + + @abstractmethod + def read_leb128(self) -> int: + pass + + @abstractmethod + def read_leb128_str(self) -> str: + pass + + @abstractmethod + def read_uint64(self) -> int: + pass + + @abstractmethod + def read_bytes(self, sz: int) -> bytes: + pass + + @abstractmethod + def read_str_col(self, num_rows: int, encoding: str, nullable: bool = False, null_obj: Any = None): + pass + + @abstractmethod + def read_bytes_col(self, sz: int, num_rows: int): + pass + + @abstractmethod + def read_fixed_str_col(self, sz: int, num_rows: int, encoding: str): + pass + + @abstractmethod + def read_array(self, array_type: str, num_rows: int): + pass + + @abstractmethod + def read_byte(self) -> int: + pass diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driverc/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/driverc/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driverc/__init__.py diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driverc/buffer.pxd b/contrib/python/clickhouse-connect/clickhouse_connect/driverc/buffer.pxd new file mode 100644 index 0000000000..b522053fec --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driverc/buffer.pxd @@ -0,0 +1,12 @@ +cdef class ResponseBuffer: + cdef: + unsigned long long buf_loc, buf_sz, slice_sz + signed long long slice_start + object gen, source + char* buffer + char* slice + unsigned char _read_byte_load(self) except ?255 + char* read_bytes_c(self, unsigned long long sz) except NULL + Py_buffer buff_source + cdef object _read_str_col(self, unsigned long long num_rows, char * encoding) + cdef object _read_nullable_str_col(self, unsigned long long num_rows, char * encoding, object null_obj) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driverc/buffer.pyx b/contrib/python/clickhouse-connect/clickhouse_connect/driverc/buffer.pyx new file mode 100644 index 0000000000..9e774cd514 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driverc/buffer.pyx @@ -0,0 +1,308 @@ +import sys +from typing import Iterable, Any, Optional + +import cython + +from cpython cimport Py_INCREF, array +import array +from cpython.unicode cimport PyUnicode_Decode +from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM +from cpython.bytes cimport PyBytes_FromStringAndSize +from cpython.buffer cimport PyObject_GetBuffer, PyBuffer_Release, PyBUF_ANY_CONTIGUOUS, PyBUF_SIMPLE +from cpython.mem cimport PyMem_Free, PyMem_Malloc +from libc.string cimport memcpy + +from clickhouse_connect.driver.exceptions import StreamCompleteException + +cdef union ull_wrapper: + char* source + unsigned long long int_value + +cdef char * errors = 'strict' +cdef char * utf8 = 'utf8' +cdef dict array_templates = {} +cdef bint must_swap = sys.byteorder == 'big' +cdef array.array swapper = array.array('Q', [0]) + +for c in 'bBuhHiIlLqQfd': + array_templates[c] = array.array(c, []) + + +cdef class ResponseBuffer: + def __init__(self, source): + self.slice_sz = 4096 + self.buf_loc = 0 + self.buf_sz = 0 + self.source = source + self.gen = source.gen + self.buffer = NULL + self.slice = <char*>PyMem_Malloc(self.slice_sz) + + # Note that return char * return from this method is only good until the next call to _read_bytes_c or + # _read_byte_load, since it points into self.buffer which can be replaced with the next chunk from the stream + # Accordingly, that memory MUST be copied/processed into another buffer/PyObject immediately + @cython.boundscheck(False) + @cython.wraparound(False) + cdef char * read_bytes_c(self, unsigned long long sz) except NULL: + cdef unsigned long long x, e, tail, cur_len, temp + cdef char* ptr + e = self.buf_sz + + if self.buf_loc + sz <= e: + # We still have "sz" unread bytes available in the buffer, return the currently loc and advance it + temp = self.buf_loc + self.buf_loc += sz + return self.buffer + temp + + # We need more data than is currently in the buffer, copy what's left into the temporary slice, + # get a new buffer, and append what we need from the new buffer into that slice + cur_len = e - self.buf_loc + temp = self.slice_sz # + while temp < sz * 2: + temp <<= 1 + if temp > self.slice_sz: + PyMem_Free(self.slice) + self.slice = <char*>PyMem_Malloc(temp) + self.slice_sz = temp + if cur_len > 0: + memcpy(self.slice, self.buffer + self.buf_loc, cur_len) + self.buf_loc = 0 + self.buf_sz = 0 + + # Loop until we've read enough chunks to fill the requested size + while cur_len < sz: + chunk = next(self.gen, None) + if not chunk: + raise StreamCompleteException + x = len(chunk) + ptr = <char *> chunk + if cur_len + x <= sz: + # We need this whole chunk for the requested size, copy it into the temporary slice and get the next one + memcpy(self.slice + cur_len, ptr, x) + cur_len += x + else: + # We need just the beginning of this chunk to finish the temporary, copy that and set + # the pointer into our stored buffer to the first unread data + tail = sz - cur_len + memcpy(self.slice + cur_len, ptr, tail) + PyBuffer_Release(&self.buff_source) + PyObject_GetBuffer(chunk, &self.buff_source, PyBUF_SIMPLE | PyBUF_ANY_CONTIGUOUS) + self.buffer = <char *> self.buff_source.buf + self.buf_sz = x + self.buf_loc = tail + cur_len += tail + return self.slice + + @cython.boundscheck(False) + @cython.wraparound(False) + cdef inline unsigned char _read_byte_load(self) except ?255: + self.buf_loc = 0 + self.buf_sz = 0 + chunk = next(self.gen, None) + if not chunk: + raise StreamCompleteException + x = len(chunk) + py_chunk = chunk + if x > 1: + PyBuffer_Release(&self.buff_source) + PyObject_GetBuffer(chunk, &self.buff_source, PyBUF_SIMPLE | PyBUF_ANY_CONTIGUOUS) + self.buffer = <char *> self.buff_source.buf + self.buf_loc = 1 + self.buf_sz = x + return <unsigned char>chunk[0] + + @cython.boundscheck(False) + @cython.wraparound(False) + cdef inline object _read_str_col(self, unsigned long long num_rows, char * encoding): + cdef object column = PyTuple_New(num_rows), v + cdef unsigned long long x = 0, sz, shift + cdef unsigned char b + cdef char* buf + while x < num_rows: + sz = 0 + shift = 0 + while 1: + if self.buf_loc < self.buf_sz: + b = self.buffer[self.buf_loc] + self.buf_loc += 1 + else: + b = self._read_byte_load() + sz += ((b & 0x7f) << shift) + if (b & 0x80) == 0: + break + shift += 7 + buf = self.read_bytes_c(sz) + if encoding: + try: + v = PyUnicode_Decode(buf, sz, encoding, errors) + except UnicodeDecodeError: + v = PyBytes_FromStringAndSize(buf, sz).hex() + else: + v = PyBytes_FromStringAndSize(buf, sz) + PyTuple_SET_ITEM(column, x, v) + Py_INCREF(v) + x += 1 + return column + + @cython.boundscheck(False) + @cython.wraparound(False) + cdef inline object _read_nullable_str_col(self, unsigned long long num_rows, char * encoding, object null_obj): + cdef object column = PyTuple_New(num_rows), v + cdef unsigned long long x = 0, sz, shift + cdef unsigned char b + cdef char * buf + cdef char * null_map = <char *> PyMem_Malloc(<size_t> num_rows) + memcpy(<void *> null_map, <void *> self.read_bytes_c(num_rows), num_rows) + for x in range(num_rows): + if self.buf_loc < self.buf_sz: + b = self.buffer[self.buf_loc] + self.buf_loc += 1 + else: + b = self._read_byte_load() + shift = 0 + sz = b & 0x7f + while b & 0x80: + shift += 7 + if self.buf_loc < self.buf_sz: + b = self.buffer[self.buf_loc] + self.buf_loc += 1 + else: + b = self._read_byte_load() + sz += ((b & 0x7f) << shift) + buf = self.read_bytes_c(sz) + if null_map[x]: + v = null_obj + elif encoding: + try: + v = PyUnicode_Decode(buf, sz, encoding, errors) + except UnicodeDecodeError: + v = PyBytes_FromStringAndSize(buf, sz).hex() + else: + v = PyBytes_FromStringAndSize(buf, sz) + PyTuple_SET_ITEM(column, x, v) + Py_INCREF(v) + PyMem_Free(<void *> null_map) + return column + + @cython.boundscheck(False) + @cython.wraparound(False) + def read_byte(self) -> int: + if self.buf_loc < self.buf_sz: + b = self.buffer[self.buf_loc] + self.buf_loc += 1 + return b + b = self._read_byte_load() + return b + + def read_leb128_str(self) -> str: + cdef unsigned long long sz = self.read_leb128() + cdef char * b = self.read_bytes_c(sz) + return PyUnicode_Decode(b, sz, utf8, errors) + + @cython.boundscheck(False) + @cython.wraparound(False) + def read_leb128(self) -> int: + cdef: + unsigned long long sz = 0, shift = 0 + unsigned char b + while 1: + if self.buf_loc < self.buf_sz: + b = self.buffer[self.buf_loc] + self.buf_loc += 1 + else: + b = self._read_byte_load() + sz += ((b & 0x7f) << shift) + if (b & 0x80) == 0: + return sz + shift += 7 + + @cython.boundscheck(False) + @cython.wraparound(False) + def read_uint64(self) -> int: + cdef ull_wrapper* x + cdef char* b = self.read_bytes_c(8) + if must_swap: + memcpy(swapper.data.as_voidptr, b, 8) + swapper.byteswap() + return swapper[0] + x = <ull_wrapper *> b + return x.int_value + + @cython.boundscheck(False) + @cython.wraparound(False) + def read_bytes(self, unsigned long long sz) -> bytes: + cdef char* b = self.read_bytes_c(sz) + return b[:sz] + + def read_str_col(self, + unsigned long long num_rows, + encoding: Optional[str], + nullable: bool = False, + null_object: Any = None) -> Iterable[str]: + cdef char * enc = NULL + if encoding: + pyenc = encoding.encode() + enc = pyenc + if nullable: + return self._read_nullable_str_col(num_rows, enc, null_object) + return self._read_str_col(num_rows, enc) + + @cython.boundscheck(False) + @cython.wraparound(False) + def read_array(self, t: str, unsigned long long num_rows) -> Iterable[Any]: + cdef array.array template = array_templates[t] + cdef array.array result = array.clone(template, num_rows, 0) + cdef unsigned long long sz = result.itemsize * num_rows + cdef char * b = self.read_bytes_c(sz) + memcpy(result.data.as_voidptr, b, sz) + if must_swap: + result.byteswap() + return result + + @cython.boundscheck(False) + @cython.wraparound(False) + def read_bytes_col(self, unsigned long long sz, unsigned long long num_rows) -> Iterable[Any]: + cdef object column = PyTuple_New(num_rows) + cdef char * b = self.read_bytes_c(sz * num_rows) + for x in range(num_rows): + v = PyBytes_FromStringAndSize(b, sz) + b += sz + PyTuple_SET_ITEM(column, x, v) + Py_INCREF(v) + return column + + @cython.boundscheck(False) + @cython.wraparound(False) + def read_fixed_str_col(self, unsigned long long sz, unsigned long long num_rows, + encoding:str ='utf8') -> Iterable[str]: + cdef object column = PyTuple_New(num_rows) + cdef char * enc + cdef char * b = self.read_bytes_c(sz * num_rows) + cdef object v + pyenc = encoding.encode() + enc = pyenc + for x in range(num_rows): + try: + v = PyUnicode_Decode(b, sz, enc, errors) + except UnicodeDecodeError: + v = PyBytes_FromStringAndSize(b, sz).hex() + PyTuple_SET_ITEM(column, x, v) + Py_INCREF(v) + b += sz + return column + + def close(self): + if self.source: + self.source.close() + self.source = None + + @property + def last_message(self): + if self.buffer == NULL: + return None + return self.buffer[self.buf_sz:].decode() + + def __dealloc__(self): + self.close() + PyBuffer_Release(&self.buff_source) + PyMem_Free(self.slice) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driverc/dataconv.pyx b/contrib/python/clickhouse-connect/clickhouse_connect/driverc/dataconv.pyx new file mode 100644 index 0000000000..dab9114155 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driverc/dataconv.pyx @@ -0,0 +1,312 @@ +import struct +from typing import Sequence, Optional + +import array +from datetime import datetime, date + +import cython + +from .buffer cimport ResponseBuffer +from cpython cimport Py_INCREF, Py_DECREF +from cpython.buffer cimport PyBUF_READ +from cpython.mem cimport PyMem_Free, PyMem_Malloc +from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM +from cpython.bytearray cimport PyByteArray_GET_SIZE, PyByteArray_Resize +from cpython.memoryview cimport PyMemoryView_FromMemory +from cython.view cimport array as cvarray +from ipaddress import IPv4Address +from uuid import UUID, SafeUUID +from libc.string cimport memcpy +from datetime import tzinfo + + +@cython.boundscheck(False) +@cython.wraparound(False) +def pivot(data: Sequence, unsigned long long start, unsigned long long end): + cdef unsigned long long row_count = end - start + cdef unsigned long long col_count = len(data[0]) + cdef object result = PyTuple_New(col_count) + cdef object col, v + for x in range(col_count): + col = PyTuple_New(row_count) + PyTuple_SET_ITEM(result, x, col) + Py_INCREF(col) + for y in range(row_count): + v = data[y + start][x] + PyTuple_SET_ITEM(col, y, v) + Py_INCREF(v) + return result + + +@cython.wraparound(False) +@cython.boundscheck(False) +def read_ipv4_col(ResponseBuffer buffer, unsigned long long num_rows): + cdef unsigned long long x = 0 + cdef char* loc = buffer.read_bytes_c(4 * num_rows) + cdef object column = PyTuple_New(num_rows), v + ip_new = IPv4Address.__new__ + while x < num_rows: + v = ip_new(IPv4Address) + v._ip = (<unsigned int*>loc)[0] + PyTuple_SET_ITEM(column, x, v) + Py_INCREF(v) + loc += 4 + x += 1 + return column + + +@cython.boundscheck(False) +@cython.wraparound(False) +def read_datetime_col(ResponseBuffer buffer, unsigned long long num_rows, tzinfo: tzinfo): + cdef unsigned long long x = 0 + cdef char * loc = buffer.read_bytes_c(4 * num_rows) + cdef object column = PyTuple_New(num_rows), v + if tzinfo is None: + fts = datetime.utcfromtimestamp + while x < num_rows: + v = fts((<unsigned int*>loc)[0]) + PyTuple_SET_ITEM(column, x, v) + Py_INCREF(v) + loc += 4 + x += 1 + else: + fts = datetime.fromtimestamp + while x < num_rows: + v = fts((<unsigned int*>loc)[0], tzinfo) + PyTuple_SET_ITEM(column, x, v) + Py_INCREF(v) + loc += 4 + x += 1 + return column + + +@cython.boundscheck(False) +@cython.wraparound(False) +def read_date_col(ResponseBuffer buffer, unsigned long long num_rows): + cdef unsigned long long x = 0 + cdef char * loc = buffer.read_bytes_c(2 * num_rows) + cdef object column = PyTuple_New(num_rows), v + while x < num_rows: + v = epoch_days_to_date((<unsigned short*>loc)[0]) + PyTuple_SET_ITEM(column, x, v) + Py_INCREF(v) + loc += 2 + x += 1 + return column + + +@cython.boundscheck(False) +@cython.wraparound(False) +def read_date32_col(ResponseBuffer buffer, unsigned long long num_rows): + cdef unsigned long long x = 0 + cdef char * loc = buffer.read_bytes_c(4 * num_rows) + cdef object column = PyTuple_New(num_rows), v + while x < num_rows: + v = epoch_days_to_date((<int*>loc)[0]) + PyTuple_SET_ITEM(column, x, v) + Py_INCREF(v) + loc += 4 + x += 1 + return column + + +cdef unsigned short* MONTH_DAYS = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334, 365] +cdef unsigned short* MONTH_DAYS_LEAP = [0, 31, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335, 366] + +# Constants used in epoch_days_to_date +# 47482 -- Jan 1, 2100 -- Because all years 1970-2099 divisible by 4 are leap years, some extra division can be avoided +# 134774 -- Number of days between Jan 1 1601 and Jan 1 1970. Adding this starts all calculations at 1601-01-01 +# 1461 -- Number of days in a 4-year cycle (365 * 4) + 1 leap day +# 36524 -- Number of days in a 100-year cycle. 25 4-year cycles - 1 leap day for the year 100 +# 146097 -- Number of days in a 400-year cycle. 4 100 year cycles + 1 leap day for the year 400 + +# Year and offset with in the year are determined by factoring out the largest "known" year blocks in +# descending order (400/100/4/1 years). Month is then (over) estimated in the "day" arrays (days / 32) and +# adjusted down if too large (logic originally in the Python standard library) + +@cython.cdivision(True) +@cython.boundscheck(False) +@cython.wraparound(False) +cpdef inline object epoch_days_to_date(int days): + cdef int years, month, year, cycles400, cycles100, cycles, rem + cdef unsigned short prev + cdef unsigned short* m_list + if 0 <= days < 47482: + cycles = (days + 365) // 1461 + rem = (days + 365) - cycles * 1461 + years = rem // 365 + rem -= years * 365 + year = (cycles << 2) + years + 1969 + if years == 4: + return date(year - 1, 12, 31) + if years == 3: + m_list = MONTH_DAYS_LEAP + else: + m_list = MONTH_DAYS + else: + cycles400 = (days + 134774) // 146097 + rem = days + 134774 - (cycles400 * 146097) + cycles100 = rem // 36524 + rem -= cycles100 * 36524 + cycles = rem // 1461 + rem -= cycles * 1461 + years = rem // 365 + rem -= years * 365 + year = (cycles << 2) + cycles400 * 400 + cycles100 * 100 + years + 1601 + if years == 4 or cycles100 == 4: + return date(year - 1, 12, 31) + if years == 3 and (year == 2000 or year % 100 != 0): + m_list = MONTH_DAYS_LEAP + else: + m_list = MONTH_DAYS + month = (rem + 24) >> 5 + prev = m_list[month] + while rem < prev: + month -= 1 + prev = m_list[month] + return date(year, month + 1, rem + 1 - prev) + + +@cython.boundscheck(False) +@cython.wraparound(False) +def read_uuid_col(ResponseBuffer buffer, unsigned long long num_rows): + cdef unsigned long long x = 0 + cdef char * loc = buffer.read_bytes_c(16 * num_rows) + cdef char[16] temp + cdef object column = PyTuple_New(num_rows), v + new_uuid = UUID.__new__ + unsafe = SafeUUID.unsafe + oset = object.__setattr__ + for x in range(num_rows): + memcpy (<void *>temp, <void *>(loc + 8), 8) + memcpy (<void *>(temp + 8), <void *>loc, 8) + v = new_uuid(UUID) + oset(v, 'int', int.from_bytes(temp[:16], 'little')) + oset(v, 'is_safe', unsafe) + PyTuple_SET_ITEM(column, x, v) + Py_INCREF(v) + loc += 16 + return column + + +@cython.boundscheck(False) +@cython.wraparound(False) +def read_nullable_array(ResponseBuffer buffer, array_type: str, unsigned long long num_rows, object null_obj): + if num_rows == 0: + return () + cdef unsigned long long x = 0 + cdef size_t item_size = struct.calcsize(array_type) + cdef cvarray cy_array = cvarray((num_rows,), item_size, array_type, mode='c', allocate_buffer=False) + + # We have to make a copy of the incoming null map because the next + # "read_byes_c" call could invalidate our pointer by replacing the underlying buffer + cdef char * null_map = <char *>PyMem_Malloc(<size_t>num_rows) + memcpy(<void *>null_map, <void *>buffer.read_bytes_c(num_rows), num_rows) + + cy_array.data = buffer.read_bytes_c(num_rows * item_size) + cdef object column = tuple(memoryview(cy_array)) + for x in range(num_rows): + if null_map[x] != 0: + Py_DECREF(column[x]) + Py_INCREF(null_obj) + PyTuple_SET_ITEM(column, x, null_obj) + PyMem_Free(<void *>null_map) + return column + + +@cython.boundscheck(False) +@cython.wraparound(False) +def build_nullable_column(source: Sequence, char * null_map, object null_obj): + cdef unsigned long long num_rows = len(source), x + cdef object column = PyTuple_New(num_rows), v + for x in range(num_rows): + if null_map[x] == 0: + v = source[x] + else: + v = null_obj + Py_INCREF(v) + PyTuple_SET_ITEM(column, x, v) + return column + + +@cython.boundscheck(False) +@cython.wraparound(False) +def build_lc_nullable_column(index: Sequence, keys: array.array, object null_obj): + cdef unsigned long long num_rows = len(keys), x, y + cdef object column = PyTuple_New(num_rows), v + for x in range(num_rows): + y = keys[x] + if y == 0: + v = null_obj + else: + v = index[y] + Py_INCREF(v) + PyTuple_SET_ITEM(column, x, v) + return column + + +@cython.boundscheck(False) +@cython.wraparound(False) +cdef inline extend_byte_array(target: bytearray, int start, object source, Py_ssize_t sz): + PyByteArray_Resize(target, start + sz) + target[start:start + sz] = source[0:sz] + + +@cython.boundscheck(False) +@cython.wraparound(False) +def write_str_col(column: Sequence, encoding: Optional[str], dest: bytearray): + cdef unsigned long long buff_size = len(column) << 5 + cdef unsigned long long buff_loc = 0, sz = 0, dsz = 0 + cdef unsigned long long array_size = PyByteArray_GET_SIZE(dest) + cdef char * temp_buff = <char *>PyMem_Malloc(<size_t>buff_size) + cdef object mv = PyMemoryView_FromMemory(temp_buff, buff_size, PyBUF_READ) + cdef object encoded + cdef char b + cdef char * data + for x in column: + if not x: + temp_buff[buff_loc] = 0 + buff_loc += 1 + if buff_loc == buff_size: + extend_byte_array(dest, array_size, mv, buff_loc) + array_size += buff_loc + buff_loc = 0 + else: + if not encoding: + data = x + dsz = len(x) + else: + encoded = x.encode(encoding) + dsz = len(encoded) + data = encoded + sz = dsz + while True: + b = sz & 0x7f + sz >>= 7 + if sz != 0: + b |= 0x80 + temp_buff[buff_loc] = b + buff_loc += 1 + if buff_loc == buff_size: + extend_byte_array(dest, array_size, mv, buff_loc) + array_size += buff_loc + buff_loc = 0 + if sz == 0: + break + if dsz + buff_loc >= buff_size: + if buff_loc > 0: # Write what we have so far + extend_byte_array(dest, array_size, mv, buff_loc) + array_size += buff_loc + buff_loc = 0 + if (dsz << 4) > buff_size: # resize our buffer for very large strings + PyMem_Free(<void *> temp_buff) + mv.release() + buff_size = dsz << 6 + temp_buff = <char *> PyMem_Malloc(<size_t> buff_size) + mv = PyMemoryView_FromMemory(temp_buff, buff_size, PyBUF_READ) + memcpy(temp_buff + buff_loc, data, dsz) + buff_loc += dsz + if buff_loc > 0: + extend_byte_array(dest, array_size, mv, buff_loc) + mv.release() + PyMem_Free(<void *>temp_buff) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driverc/npconv.pyx b/contrib/python/clickhouse-connect/clickhouse_connect/driverc/npconv.pyx new file mode 100644 index 0000000000..87f4981392 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driverc/npconv.pyx @@ -0,0 +1,13 @@ +import cython + +import numpy as np + +from .buffer cimport ResponseBuffer + +@cython.boundscheck(False) +@cython.wraparound(False) +def read_numpy_array(ResponseBuffer buffer, np_type: str, unsigned long long num_rows): + dtype = np.dtype(np_type) + cdef sz = dtype.itemsize * num_rows + cdef char * source = buffer.read_bytes_c(dtype.itemsize * num_rows) + return np.frombuffer(source[:sz], dtype, num_rows) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/entry_points.py b/contrib/python/clickhouse-connect/clickhouse_connect/entry_points.py new file mode 100644 index 0000000000..9981e98a39 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/entry_points.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +# This script is used for validating installed entrypoints. Note that it fails on Python 3.7 +import sys + +from importlib.metadata import PackageNotFoundError, distribution + +EXPECTED_EPS = {'sqlalchemy.dialects:clickhousedb', + 'sqlalchemy.dialects:clickhousedb.connect'} + + +def validate_entrypoints(): + expected_eps = EXPECTED_EPS.copy() + try: + dist = distribution('clickhouse-connect') + except PackageNotFoundError: + print ('\nClickHouse Connect package not found in this Python installation') + return -1 + print() + for entry_point in dist.entry_points: + name = f'{entry_point.group}:{entry_point.name}' + print(f' {name}={entry_point.value}') + try: + expected_eps.remove(name) + except KeyError: + print (f'\nUnexpected entry point {name} found') + return -1 + if expected_eps: + print() + for name in expected_eps: + print (f'Did not find expected ep {name}') + return -1 + print ('\nEntrypoints correctly installed') + return 0 + + +if __name__ == '__main__': + sys.exit(validate_entrypoints()) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/json_impl.py b/contrib/python/clickhouse-connect/clickhouse_connect/json_impl.py new file mode 100644 index 0000000000..686ddf3955 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/json_impl.py @@ -0,0 +1,50 @@ +import logging +import json as py_json +from collections import OrderedDict +from typing import Any + +try: + import orjson + any_to_json = orjson.dumps # pylint: disable=no-member +except ImportError: + orjson = None + +try: + import ujson + + def _ujson_to_json(obj: Any) -> bytes: + return ujson.dumps(obj).encode() # pylint: disable=c-extension-no-member +except ImportError: + ujson = None + _ujson_to_json = None + + +def _pyjson_to_json(obj: Any) -> bytes: + return py_json.dumps(obj, separators=(',', ':')).encode() + + +logger = logging.getLogger(__name__) +_to_json = OrderedDict() +_to_json['orjson'] = orjson.dumps if orjson else None # pylint: disable=no-member +_to_json['ujson'] = _ujson_to_json if ujson else None +_to_json['python'] = _pyjson_to_json + +any_to_json = _pyjson_to_json + + +def set_json_library(impl: str = None): + global any_to_json # pylint: disable=global-statement + if impl: + func = _to_json.get(impl) + if func: + any_to_json = func + return + raise NotImplementedError(f'JSON library {impl} is not supported') + for library, func in _to_json.items(): + if func: + logger.debug('Using %s library for writing JSON byte strings', library) + any_to_json = func + break + + +set_json_library() diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/py.typed b/contrib/python/clickhouse-connect/clickhouse_connect/py.typed new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/py.typed diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/tools/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/tools/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/tools/__init__.py diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/tools/datagen.py b/contrib/python/clickhouse-connect/clickhouse_connect/tools/datagen.py new file mode 100644 index 0000000000..490d852916 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/tools/datagen.py @@ -0,0 +1,208 @@ +import struct +import uuid +from decimal import Decimal as PyDecimal +from ipaddress import IPv4Address, IPv6Address +from random import random, choice +from typing import Sequence, Union, NamedTuple, Callable, Type, Dict +from datetime import date, datetime, timedelta, tzinfo + +import pytz + +from clickhouse_connect.datatypes.base import ClickHouseType +from clickhouse_connect.datatypes.container import Array, Tuple, Map, Nested +from clickhouse_connect.datatypes.network import IPv4, IPv6 +from clickhouse_connect.datatypes.numeric import BigInt, Float32, Float64, Enum, Bool, Boolean, Decimal +from clickhouse_connect.datatypes.registry import get_from_name +from clickhouse_connect.datatypes.special import UUID +from clickhouse_connect.datatypes.string import String, FixedString +from clickhouse_connect.datatypes.temporal import Date, Date32, DateTime, DateTime64 +from clickhouse_connect.driver.common import array_sizes + +dt_from_ts = datetime.utcfromtimestamp +dt_from_ts_tz = datetime.fromtimestamp +epoch_date = date(1970, 1, 1) +date32_start_date = date(1925, 1, 1) + + +class RandomValueDef(NamedTuple): + """ + Parameter object to control the generation of random data values for testing + """ + server_tz: tzinfo = pytz.UTC + null_pct: float = 0.15 + str_len: int = 200 + arr_len: int = 12 + ascii_only: bool = False + + +def random_col_data(ch_type: Union[str, ClickHouseType], cnt: int, col_def: RandomValueDef = RandomValueDef()): + """ + Generate a column of random data for insert tests + :param ch_type: ClickHouseType or ClickHouse type name + :param cnt: Number of values to generate + :param col_def: Parameters to use for random data generation + :return: A tuple of length cnt of random Python data values of the requested ClickHouseType + """ + if isinstance(ch_type, str): + ch_type = get_from_name(ch_type) + gen = random_value_gen(ch_type, col_def) + if ch_type.nullable: + x = col_def.null_pct + return tuple(gen() if random() > x else None for _ in range(cnt)) + return tuple(gen() for _ in range(cnt)) + + +# pylint: disable=too-many-return-statements,too-many-branches,protected-access +def random_value_gen(ch_type: ClickHouseType, col_def: RandomValueDef): + """ + Returns a generator function of random values of the requested ClickHouseType + :param ch_type: ClickHouseType to generate + :param col_def: Parameters for the generated values + :return: Function or lambda that will return a random value of the requested type + """ + if ch_type.__class__ in gen_map: + return gen_map[ch_type.__class__] + if isinstance(ch_type, BigInt) or ch_type.python_type == int: + if isinstance(ch_type, BigInt): + sz = 2 ** (ch_type.byte_size * 8) + signed = ch_type._signed + else: + sz = 2 ** (array_sizes[ch_type._array_type.lower()] * 8) + signed = ch_type._array_type == ch_type._array_type.lower() + if signed: + sub = sz >> 1 + return lambda: int(random() * sz) - sub + return lambda: int(random() * sz) + if isinstance(ch_type, Array): + return lambda: list(random_col_data(ch_type.element_type, int(random() * col_def.arr_len), col_def)) + if isinstance(ch_type, Decimal): + return lambda: random_decimal(ch_type.prec, ch_type.scale) + if isinstance(ch_type, Map): + return lambda: random_map(ch_type.key_type, ch_type.value_type, int(random() * col_def.arr_len), col_def) + if isinstance(ch_type, Tuple): + return lambda: random_tuple(ch_type.element_types, col_def) + if isinstance(ch_type, Enum): + keys = list(ch_type._name_map.keys()) + return lambda: choice(keys) + if isinstance(ch_type, Nested): + return lambda: random_nested(ch_type.element_names, ch_type.element_types, col_def) + if isinstance(ch_type, String): + if col_def.ascii_only: + return lambda: random_ascii_str(col_def.str_len) + return lambda: random_utf8_str(col_def.str_len) + if isinstance(ch_type, FixedString): + return lambda: bytes((int(random() * 256) for _ in range(ch_type.byte_size))) + if isinstance(ch_type, DateTime): + if col_def.server_tz == pytz.UTC: + return random_datetime + timezone = col_def.server_tz + return lambda: random_datetime_tz(timezone) + if isinstance(ch_type, DateTime64): + prec = ch_type.prec + if col_def.server_tz == pytz.UTC: + return lambda: random_datetime64(prec) + timezone = col_def.server_tz + return lambda: random_datetime64_tz(prec, timezone) + raise ValueError(f'Invalid ClickHouse type {ch_type.name} for random column data') + + +def random_float(): + return (random() * random() * 65536) / (random() * (random() * 256 - 128)) + + +def random_float32(): + f64 = (random() * random() * 65536) / (random() * (random() * 256 - 128)) + return struct.unpack('f', struct.pack('f', f64))[0] + + +def random_decimal(prec: int, scale: int): + digits = ''.join(str(int(random() * 12000000000)) for _ in range(prec // 10 + 1)).rjust(prec, '0')[:prec] + sign = '' if ord(digits[0]) & 0x01 else '-' + if scale == 0: + return PyDecimal(f'{sign}{digits}') + return PyDecimal(f'{sign}{digits[:-scale]}.{digits[-scale:]}') + + +def random_tuple(element_types: Sequence[ClickHouseType], col_def): + return tuple(random_value_gen(x, col_def)() for x in element_types) + + +def random_map(key_type, value_type, sz: int, col_def): + keys = random_col_data(key_type, sz, col_def) + values = random_col_data(value_type, sz, col_def) + return dict(zip(keys, values)) + + +def random_datetime(): + return dt_from_ts(int(random() * 2 ** 32)).replace(microsecond=0) + + +def random_datetime_tz(timezone: tzinfo): + return dt_from_ts_tz(int(random() * 2 ** 32), timezone).replace(microsecond=0) + + +def random_ascii_str(max_len: int = 200, min_len: int = 0): + return ''.join((chr(int(random() * 95) + 32) for _ in range(int(random() * (max_len - min_len)) + min_len))) + + +def random_utf8_str(max_len: int = 200): + random_chars = [chr(int(random() * 65000) + 32) for _ in range(int(random() * max_len))] + return ''.join((c for c in random_chars if c.isprintable())) + + +def fixed_len_ascii_str(str_len: int = 200): + return ''.join((chr(int(random() * 95) + 32) for _ in range(str_len))) + + +# Only accepts precisions in multiples of 3 because others are extremely unlikely to be actually used +def random_datetime64(prec: int): + if prec == 1: + u_sec = 0 + elif prec == 1000: + u_sec = int(random() * 1000) * 1000 + else: + u_sec = int(random() * 1000000) + return dt_from_ts(int(random() * 4294967296)).replace(microsecond=u_sec) + + +def random_datetime64_tz(prec: int, timezone: tzinfo): + if prec == 1: + u_sec = 0 + elif prec == 1000: + u_sec = int(random() * 1000) * 1000 + else: + u_sec = int(random() * 1000000) + return dt_from_ts_tz(int(random() * 4294967296), timezone).replace(microsecond=u_sec) + + +def random_ipv6(): + if random() > 0.2: + # multiple randoms because of random float multiply limitations + ip_int = (int(random() * 4294967296) << 96) | (int(random() * 4294967296)) | ( + int(random() * 4294967296) << 32) | ( int(random() * 4294967296) << 64) + return IPv6Address(ip_int) + return IPv4Address(int(random() * 2 ** 32)) + + +def random_nested(keys: Sequence[str], types: Sequence[ClickHouseType], col_def: RandomValueDef): + sz = int(random() * col_def.arr_len) // 2 + row = [] + for _ in range(sz): + nested_element = {} + for name, col_type in zip(keys, types): + nested_element[name] = random_value_gen(col_type, col_def)() + row.append(nested_element) + return row + + +gen_map: Dict[Type[ClickHouseType], Callable] = { + Float64: random_float, + Float32: random_float32, + Date: lambda: epoch_date + timedelta(days=int(random() * 65536)), + Date32: lambda: date32_start_date + timedelta(days=random() * 130000), + UUID: uuid.uuid4, + IPv4: lambda: IPv4Address(int(random() * 4294967296)), + IPv6: random_ipv6, + Boolean: lambda: random() > .5, + Bool: lambda: random() > .5 +} diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/tools/testing.py b/contrib/python/clickhouse-connect/clickhouse_connect/tools/testing.py new file mode 100644 index 0000000000..f30b3f754a --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/tools/testing.py @@ -0,0 +1,52 @@ +from typing import Sequence, Optional, Union, Dict, Any + +from clickhouse_connect.driver import Client +from clickhouse_connect.driver.query import format_query_value + + +class TableContext: + def __init__(self, client: Client, + table: str, + columns: Union[str, Sequence[str]], + column_types: Optional[Sequence[str]] = None, + engine: str = 'MergeTree', + order_by: str = None, + settings: Optional[Dict[str, Any]] = None): + self.client = client + self.table = table + self.settings = settings + if isinstance(columns, str): + columns = columns.split(',') + if column_types is None: + self.column_names = [] + self.column_types = [] + for col in columns: + col = col.strip() + ix = col.find(' ') + self.column_types.append(col[ix + 1:].strip()) + self.column_names.append(col[:ix].strip()) + else: + self.column_names = columns + self.column_types = column_types + self.engine = engine + self.order_by = self.column_names[0] if order_by is None else order_by + + def __enter__(self): + if self.client.min_version('19'): + self.client.command(f'DROP TABLE IF EXISTS {self.table}') + else: + self.client.command(f'DROP TABLE IF EXISTS {self.table} SYNC') + col_defs = ','.join(f'{name} {col_type}' for name, col_type in zip(self.column_names, self.column_types)) + create_cmd = f'CREATE TABLE {self.table} ({col_defs}) ENGINE {self.engine} ORDER BY {self.order_by}' + if self.settings: + create_cmd += ' SETTINGS ' + for key, value in self.settings.items(): + + create_cmd += f'{key} = {format_query_value(value)}, ' + if create_cmd.endswith(', '): + create_cmd = create_cmd[:-2] + self.client.command(create_cmd) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.client.command(f'DROP TABLE IF EXISTS {self.table}') diff --git a/contrib/python/clickhouse-connect/ya.make b/contrib/python/clickhouse-connect/ya.make new file mode 100644 index 0000000000..db1563c6b8 --- /dev/null +++ b/contrib/python/clickhouse-connect/ya.make @@ -0,0 +1,100 @@ +# Generated by devtools/yamaker (pypi). + +PY3_LIBRARY() + +VERSION(0.6.18) + +LICENSE(Apache-2.0) + +PEERDIR( + contrib/python/certifi + contrib/python/lz4 + contrib/python/pytz + contrib/python/urllib3 + contrib/python/zstandard +) + +ADDINCL( + FOR cython contrib/python/clickhouse-connect +) + +NO_COMPILER_WARNINGS() + +NO_LINT() + +PY_SRCS( + TOP_LEVEL + clickhouse_connect/__init__.py + clickhouse_connect/__version__.py + clickhouse_connect/cc_sqlalchemy/__init__.py + clickhouse_connect/cc_sqlalchemy/datatypes/__init__.py + clickhouse_connect/cc_sqlalchemy/datatypes/base.py + clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py + clickhouse_connect/cc_sqlalchemy/ddl/__init__.py + clickhouse_connect/cc_sqlalchemy/ddl/custom.py + clickhouse_connect/cc_sqlalchemy/ddl/tableengine.py + clickhouse_connect/cc_sqlalchemy/dialect.py + clickhouse_connect/cc_sqlalchemy/inspector.py + clickhouse_connect/cc_sqlalchemy/sql/__init__.py + clickhouse_connect/cc_sqlalchemy/sql/ddlcompiler.py + clickhouse_connect/cc_sqlalchemy/sql/preparer.py + clickhouse_connect/common.py + clickhouse_connect/datatypes/__init__.py + clickhouse_connect/datatypes/base.py + clickhouse_connect/datatypes/container.py + clickhouse_connect/datatypes/format.py + clickhouse_connect/datatypes/network.py + clickhouse_connect/datatypes/numeric.py + clickhouse_connect/datatypes/registry.py + clickhouse_connect/datatypes/special.py + clickhouse_connect/datatypes/string.py + clickhouse_connect/datatypes/temporal.py + clickhouse_connect/dbapi/__init__.py + clickhouse_connect/dbapi/connection.py + clickhouse_connect/dbapi/cursor.py + clickhouse_connect/driver/__init__.py + clickhouse_connect/driver/buffer.py + clickhouse_connect/driver/client.py + clickhouse_connect/driver/common.py + clickhouse_connect/driver/compression.py + clickhouse_connect/driver/constants.py + clickhouse_connect/driver/context.py + clickhouse_connect/driver/ctypes.py + clickhouse_connect/driver/dataconv.py + clickhouse_connect/driver/ddl.py + clickhouse_connect/driver/exceptions.py + clickhouse_connect/driver/external.py + clickhouse_connect/driver/httpclient.py + clickhouse_connect/driver/httputil.py + clickhouse_connect/driver/insert.py + clickhouse_connect/driver/models.py + clickhouse_connect/driver/npconv.py + clickhouse_connect/driver/npquery.py + clickhouse_connect/driver/options.py + clickhouse_connect/driver/parser.py + clickhouse_connect/driver/query.py + clickhouse_connect/driver/summary.py + clickhouse_connect/driver/tools.py + clickhouse_connect/driver/transform.py + clickhouse_connect/driver/types.py + clickhouse_connect/driverc/__init__.py + clickhouse_connect/entry_points.py + clickhouse_connect/json_impl.py + clickhouse_connect/tools/__init__.py + clickhouse_connect/tools/datagen.py + clickhouse_connect/tools/testing.py + CYTHON_CPP + clickhouse_connect/driverc/buffer.pyx + clickhouse_connect/driverc/dataconv.pyx + clickhouse_connect/driverc/npconv.pyx +) + +RESOURCE_FILES( + PREFIX contrib/python/clickhouse-connect/ + .dist-info/METADATA + .dist-info/entry_points.txt + .dist-info/top_level.txt + clickhouse_connect/py.typed +) + +END() |