diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-07-23 07:47:34 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-07-23 07:56:54 +0300 |
commit | e9497f65d8bbe478a583fa31b74e03a8d342ff5f (patch) | |
tree | 48793f79ab80ae9a82e77744ed89f82aeee8e59a /contrib/python/clickhouse-connect/clickhouse_connect | |
parent | 973312112362cf03ce0f7d4e5fdbcb63d1166670 (diff) | |
download | ydb-e9497f65d8bbe478a583fa31b74e03a8d342ff5f.tar.gz |
Intermediate changes
Diffstat (limited to 'contrib/python/clickhouse-connect/clickhouse_connect')
5 files changed, 753 insertions, 6 deletions
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py index 81ee6dd3a0..737aed5b34 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py @@ -1,6 +1,6 @@ -from clickhouse_connect.driver import create_client - +from clickhouse_connect.driver import create_client, create_async_client driver_name = 'clickhousedb' get_client = create_client +get_async_client = create_async_client diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py index 8dec87cb67..d9a123ac6c 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py @@ -1 +1 @@ -version = '0.7.15' +version = '0.7.16' diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py index 2f0b11b132..27e16733d8 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py @@ -1,3 +1,4 @@ +import asyncio from inspect import signature from typing import Optional, Union, Dict, Any from urllib.parse import urlparse, parse_qs @@ -7,6 +8,7 @@ from clickhouse_connect.driver.client import Client from clickhouse_connect.driver.common import dict_copy from clickhouse_connect.driver.exceptions import ProgrammingError from clickhouse_connect.driver.httpclient import HttpClient +from clickhouse_connect.driver.asyncclient import AsyncClient # pylint: disable=too-many-arguments,too-many-locals,too-many-branches @@ -68,6 +70,7 @@ def create_client(*, :param server_host_name This is the server host name that will be checked against a TLS certificate for validity. This option can be used if using an ssh_tunnel or other indirect means to an ClickHouse server where the `host` argument refers to the tunnel or proxy and not the actual ClickHouse server + :param autogenerate_session_id If set, this will override the 'autogenerate_session_id' common setting. :return: ClickHouse Connect Client instance """ if dsn: @@ -117,3 +120,78 @@ def default_port(interface: str, secure: bool): if interface.startswith('http'): return 8443 if secure else 8123 raise ValueError('Unrecognized ClickHouse interface') + + +async def create_async_client(*, + host: str = None, + username: str = None, + password: str = '', + database: str = '__default__', + interface: Optional[str] = None, + port: int = 0, + secure: Union[bool, str] = False, + dsn: Optional[str] = None, + settings: Optional[Dict[str, Any]] = None, + generic_args: Optional[Dict[str, Any]] = None, + **kwargs) -> AsyncClient: + """ + The preferred method to get an async ClickHouse Connect Client instance. + For sync version, see create_client. + + Unlike sync version, the 'autogenerate_session_id' setting by default is False. + + :param host: The hostname or IP address of the ClickHouse server. If not set, localhost will be used. + :param username: The ClickHouse username. If not set, the default ClickHouse user will be used. + :param password: The password for username. + :param database: The default database for the connection. If not set, ClickHouse Connect will use the + default database for username. + :param interface: Must be http or https. Defaults to http, or to https if port is set to 8443 or 443 + :param port: The ClickHouse HTTP or HTTPS port. If not set will default to 8123, or to 8443 if secure=True + or interface=https. + :param secure: Use https/TLS. This overrides inferred values from the interface or port arguments. + :param dsn: A string in standard DSN (Data Source Name) format. Other connection values (such as host or user) + will be extracted from this string if not set otherwise. + :param settings: ClickHouse server settings to be used with the session/every request + :param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings. + It is not recommended to use this parameter externally + :param kwargs -- Recognized keyword arguments (used by the HTTP client), see below + + :param compress: Enable compression for ClickHouse HTTP inserts and query results. True will select the preferred + compression method (lz4). A str of 'lz4', 'zstd', 'brotli', or 'gzip' can be used to use a specific compression type + :param query_limit: Default LIMIT on returned rows. 0 means no limit + :param connect_timeout: Timeout in seconds for the http connection + :param send_receive_timeout: Read timeout in seconds for http connection + :param client_name: client_name prepended to the HTTP User Agent header. Set this to track client queries + in the ClickHouse system.query_log. + :param send_progress: Deprecated, has no effect. Previous functionality is now automatically determined + :param verify: Verify the server certificate in secure/https mode + :param ca_cert: If verify is True, the file path to Certificate Authority root to validate ClickHouse server + certificate, in .pem format. Ignored if verify is False. This is not necessary if the ClickHouse server + certificate is trusted by the operating system. To trust the maintained list of "global" public root + certificates maintained by the Python 'certifi' package, set ca_cert to 'certifi' + :param client_cert: File path to a TLS Client certificate in .pem format. This file should contain any + applicable intermediate certificates + :param client_cert_key: File path to the private key for the Client Certificate. Required if the private key + is not included the Client Certificate key file + :param session_id ClickHouse session id. If not specified and the common setting 'autogenerate_session_id' + is True, the client will generate a UUID1 session id + :param pool_mgr Optional urllib3 PoolManager for this client. Useful for creating separate connection + pools for multiple client endpoints for applications with many clients + :param http_proxy http proxy address. Equivalent to setting the HTTP_PROXY environment variable + :param https_proxy https proxy address. Equivalent to setting the HTTPS_PROXY environment variable + :param server_host_name This is the server host name that will be checked against a TLS certificate for + validity. This option can be used if using an ssh_tunnel or other indirect means to an ClickHouse server + where the `host` argument refers to the tunnel or proxy and not the actual ClickHouse server + :param autogenerate_session_id If set, this will override the 'autogenerate_session_id' common setting. + :return: ClickHouse Connect Client instance + """ + + def _create_client(): + if 'autogenerate_session_id' not in kwargs: + kwargs['autogenerate_session_id'] = False + return create_client(host=host, username=username, password=password, database=database, interface=interface, + port=port, secure=secure, dsn=dsn, settings=settings, generic_args=generic_args, **kwargs) + + loop = asyncio.get_running_loop() + _client = await loop.run_in_executor(None, _create_client) + return AsyncClient(client=_client) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py new file mode 100644 index 0000000000..1a4ee7e429 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py @@ -0,0 +1,663 @@ +import asyncio +import io +from datetime import tzinfo +from typing import Optional, Union, Dict, Any, Sequence, Iterable, Generator, BinaryIO + +from clickhouse_connect.driver.client import Client +from clickhouse_connect.driver.common import StreamContext +from clickhouse_connect.driver.external import ExternalData +from clickhouse_connect.driver.query import QueryContext, QueryResult +from clickhouse_connect.driver.summary import QuerySummary +from clickhouse_connect.datatypes.base import ClickHouseType +from clickhouse_connect.driver.insert import InsertContext + + +# pylint: disable=too-many-public-methods, too-many-instance-attributes, too-many-arguments, too-many-locals +class AsyncClient: + """ + AsyncClient is a wrapper around the ClickHouse Client object that allows for async calls to the ClickHouse server. + Internally, each of the methods that uses IO is wrapped in a call to EventLoop.run_in_executor. + """ + + def __init__(self, *, client: Client): + self.client = client + + def set_client_setting(self, key, value): + """ + Set a clickhouse setting for the client after initialization. If a setting is not recognized by ClickHouse, + or the setting is identified as "read_only", this call will either throw a Programming exception or attempt + to send the setting anyway based on the common setting 'invalid_setting_action'. + :param key: ClickHouse setting name + :param value: ClickHouse setting value + """ + self.client.set_client_setting(key=key, value=value) + + def get_client_setting(self, key) -> Optional[str]: + """ + :param key: The setting key + :return: The string value of the setting, if it exists, or None + """ + return self.client.get_client_setting(key=key) + + def min_version(self, version_str: str) -> bool: + """ + Determine whether the connected server is at least the submitted version + For Altinity Stable versions like 22.8.15.25.altinitystable + the last condition in the first list comprehension expression is added + :param version_str: A version string consisting of up to 4 integers delimited by dots + :return: True if version_str is greater than the server_version, False if less than + """ + return self.client.min_version(version_str) + + def close(self): + """ + Subclass implementation to close the connection to the server/deallocate the client + """ + self.client.close() + + async def query(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + column_oriented: Optional[bool] = None, + use_numpy: Optional[bool] = None, + max_str_len: Optional[int] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> QueryResult: + """ + Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. + For parameters, see the create_query_context method. + :return: QueryResult -- data and metadata from response + """ + + def _query(): + return self.client.query(query=query, parameters=parameters, settings=settings, query_formats=query_formats, + column_formats=column_formats, encoding=encoding, use_none=use_none, + column_oriented=column_oriented, use_numpy=use_numpy, max_str_len=max_str_len, + context=context, query_tz=query_tz, column_tzs=column_tzs, + external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query) + return result + + async def query_column_block_stream(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Variation of main query method that returns a stream of column oriented blocks. + For parameters, see the create_query_context method. + :return: StreamContext -- Iterable stream context that returns column oriented blocks + """ + + def _query_column_block_stream(): + return self.client.query_column_block_stream(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, + encoding=encoding, use_none=use_none, context=context, + query_tz=query_tz, column_tzs=column_tzs, + external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_column_block_stream) + return result + + async def query_row_block_stream(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Variation of main query method that returns a stream of row oriented blocks. + For parameters, see the create_query_context method. + :return: StreamContext -- Iterable stream context that returns blocks of rows + """ + + def _query_row_block_stream(): + return self.client.query_row_block_stream(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, + encoding=encoding, use_none=use_none, context=context, + query_tz=query_tz, column_tzs=column_tzs, + external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_row_block_stream) + return result + + async def query_rows_stream(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Variation of main query method that returns a stream of row oriented blocks. + For parameters, see the create_query_context method. + :return: StreamContext -- Iterable stream context that returns blocks of rows + """ + + def _query_rows_stream(): + return self.client.query_rows_stream(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, + encoding=encoding, use_none=use_none, context=context, + query_tz=query_tz, column_tzs=column_tzs, + external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_rows_stream) + return result + + async def raw_query(self, + query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + fmt: str = None, + use_database: bool = True, + external_data: Optional[ExternalData] = None) -> bytes: + """ + Query method that simply returns the raw ClickHouse format bytes. + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param fmt: ClickHouse output format + :param use_database Send the database parameter to ClickHouse so the command will be executed in the client + database context + :param external_data External data to send with the query + :return: bytes representing raw ClickHouse return value based on format + """ + + def _raw_query(): + return self.client.raw_query(query=query, parameters=parameters, settings=settings, fmt=fmt, + use_database=use_database, external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _raw_query) + return result + + async def raw_stream(self, query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + fmt: str = None, + use_database: bool = True, + external_data: Optional[ExternalData] = None) -> io.IOBase: + """ + Query method that returns the result as an io.IOBase iterator. + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param fmt: ClickHouse output format + :param use_database Send the database parameter to ClickHouse so the command will be executed in the client + database context + :param external_data External data to send with the query + :return: io.IOBase stream/iterator for the result + """ + + def _raw_stream(): + return self.client.raw_stream(query=query, parameters=parameters, settings=settings, fmt=fmt, + use_database=use_database, external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _raw_stream) + return result + + async def query_np(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None): + """ + Query method that returns the results as a numpy array. + For parameter values, see the create_query_context method. + :return: Numpy array representing the result set + """ + + def _query_np(): + return self.client.query_np(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, encoding=encoding, + use_none=use_none, max_str_len=max_str_len, context=context, + external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_np) + return result + + async def query_np_stream(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Query method that returns the results as a stream of numpy arrays. + For parameter values, see the create_query_context method. + :return: Generator that yield a numpy array per block representing the result set + """ + + def _query_np_stream(): + return self.client.query_np_stream(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, + encoding=encoding, use_none=use_none, max_str_len=max_str_len, + context=context, external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_np_stream) + return result + + async def query_df(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + use_na_values: Optional[bool] = None, + query_tz: Optional[str] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None, + use_extended_dtypes: Optional[bool] = None): + """ + Query method that results the results as a pandas dataframe. + For parameter values, see the create_query_context method. + :return: Pandas dataframe representing the result set + """ + + def _query_df(): + return self.client.query_df(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, encoding=encoding, + use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values, + query_tz=query_tz, column_tzs=column_tzs, context=context, + external_data=external_data, use_extended_dtypes=use_extended_dtypes) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_df) + return result + + async def query_df_stream(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + use_na_values: Optional[bool] = None, + query_tz: Optional[str] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None, + use_extended_dtypes: Optional[bool] = None) -> StreamContext: + """ + Query method that returns the results as a StreamContext. + For parameter values, see the create_query_context method. + :return: Generator that yields a Pandas dataframe per block representing the result set + """ + + def _query_df_stream(): + return self.client.query_df_stream(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, + encoding=encoding, + use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values, + query_tz=query_tz, column_tzs=column_tzs, context=context, + external_data=external_data, use_extended_dtypes=use_extended_dtypes) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_df_stream) + return result + + def create_query_context(self, + query: Optional[Union[str, bytes]] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + column_oriented: Optional[bool] = None, + use_numpy: Optional[bool] = False, + max_str_len: Optional[int] = 0, + context: Optional[QueryContext] = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + use_na_values: Optional[bool] = None, + streaming: bool = False, + as_pandas: bool = False, + external_data: Optional[ExternalData] = None, + use_extended_dtypes: Optional[bool] = None) -> QueryContext: + """ + Creates or updates a reusable QueryContext object + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param query_formats: See QueryContext __init__ docstring + :param column_formats: See QueryContext __init__ docstring + :param encoding: See QueryContext __init__ docstring + :param use_none: Use None for ClickHouse NULL instead of default values. Note that using None in Numpy + arrays will force the numpy array dtype to 'object', which is often inefficient. This effect also + will impact the performance of Pandas dataframes. + :param column_oriented: Deprecated. Controls orientation of the QueryResult result_set property + :param use_numpy: Return QueryResult columns as one-dimensional numpy arrays + :param max_str_len: Limit returned ClickHouse String values to this length, which allows a Numpy + structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for + String columns will always be object arrays + :param context: An existing QueryContext to be updated with any provided parameter values + :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects). + Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime + objects with the selected timezone + :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to + tzinfo objects). The timezone will be applied to datetime objects returned in the query + :param use_na_values: Deprecated alias for use_advanced_dtypes + :param as_pandas Return the result columns as pandas.Series objects + :param streaming Marker used to correctly configure streaming queries + :param external_data ClickHouse "external data" to send with query + :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as + pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray + and StringArray. Defaulted to True for query_df methods + :return: Reusable QueryContext + """ + + return self.client.create_query_context(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, + encoding=encoding, use_none=use_none, + column_oriented=column_oriented, + use_numpy=use_numpy, max_str_len=max_str_len, context=context, + query_tz=query_tz, column_tzs=column_tzs, + use_na_values=use_na_values, + streaming=streaming, as_pandas=as_pandas, + external_data=external_data, + use_extended_dtypes=use_extended_dtypes) + + async def query_arrow(self, + query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + use_strings: Optional[bool] = None, + external_data: Optional[ExternalData] = None): + """ + Query method using the ClickHouse Arrow format to return a PyArrow table + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) + :param external_data ClickHouse "external data" to send with query + :return: PyArrow.Table + """ + + def _query_arrow(): + return self.client.query_arrow(query=query, parameters=parameters, settings=settings, + use_strings=use_strings, external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_arrow) + return result + + async def query_arrow_stream(self, + query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + use_strings: Optional[bool] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Query method that returns the results as a stream of Arrow tables + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) + :param external_data ClickHouse "external data" to send with query + :return: Generator that yields a PyArrow.Table for per block representing the result set + """ + + def _query_arrow_stream(): + return self.client.query_arrow_stream(query=query, parameters=parameters, settings=settings, + use_strings=use_strings, external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_arrow_stream) + return result + + async def command(self, + cmd: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + data: Union[str, bytes] = None, + settings: Dict[str, Any] = None, + use_database: bool = True, + external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + """ + Client method that returns a single value instead of a result set + :param cmd: ClickHouse query/command as a python format string + :param parameters: Optional dictionary of key/values pairs to be formatted + :param data: Optional 'data' for the command (for INSERT INTO in particular) + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client + database context. Otherwise, no database will be specified with the command. This is useful for determining + the default user database + :param external_data ClickHouse "external data" to send with command/query + :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary + if no data returned + """ + + def _command(): + return self.client.command(cmd=cmd, parameters=parameters, data=data, settings=settings, + use_database=use_database, external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _command) + return result + + async def ping(self) -> bool: + """ + Validate the connection, does not throw an Exception (see debug logs) + :return: ClickHouse server is up and reachable + """ + + def _ping(): + return self.client.ping() + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _ping) + return result + + async def insert(self, + table: Optional[str] = None, + data: Sequence[Sequence[Any]] = None, + column_names: Union[str, Iterable[str]] = '*', + database: Optional[str] = None, + column_types: Sequence[ClickHouseType] = None, + column_type_names: Sequence[str] = None, + column_oriented: bool = False, + settings: Optional[Dict[str, Any]] = None, + context: InsertContext = None) -> QuerySummary: + """ + Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments + other than data are ignored + :param table: Target table + :param data: Sequence of sequences of Python data + :param column_names: Ordered list of column names or '*' if column types should be retrieved from the + ClickHouse table definition + :param database: Target database -- will use client default database if not specified. + :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from + the server + :param column_type_names: ClickHouse column type names. If set then column data does not need to be + retrieved from the server + :param column_oriented: If true the data is already "pivoted" in column form + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param context: Optional reusable insert context to allow repeated inserts into the same table with + different data batches + :return: QuerySummary with summary information, throws exception if insert fails + """ + + def _insert(): + return self.client.insert(table=table, data=data, column_names=column_names, database=database, + column_types=column_types, column_type_names=column_type_names, + column_oriented=column_oriented, settings=settings, context=context) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _insert) + return result + + async def insert_df(self, table: str = None, + df=None, + database: Optional[str] = None, + settings: Optional[Dict] = None, + column_names: Optional[Sequence[str]] = None, + column_types: Sequence[ClickHouseType] = None, + column_type_names: Sequence[str] = None, + context: InsertContext = None) -> QuerySummary: + """ + Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored + :param table: ClickHouse table + :param df: two-dimensional pandas dataframe + :param database: Optional ClickHouse database + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param column_names: An optional list of ClickHouse column names. If not set, the DataFrame column names + will be used + :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from + the server + :param column_type_names: ClickHouse column type names. If set then column data does not need to be + retrieved from the server + :param context: Optional reusable insert context to allow repeated inserts into the same table with + different data batches + :return: QuerySummary with summary information, throws exception if insert fails + """ + + def _insert_df(): + return self.client.insert_df(table=table, df=df, database=database, settings=settings, + column_names=column_names, + column_types=column_types, column_type_names=column_type_names, + context=context) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _insert_df) + return result + + async def insert_arrow(self, table: str, + arrow_table, database: str = None, + settings: Optional[Dict] = None) -> QuerySummary: + """ + Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format + :param table: ClickHouse table + :param arrow_table: PyArrow Table object + :param database: Optional ClickHouse database + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :return: QuerySummary with summary information, throws exception if insert fails + """ + + def _insert_arrow(): + return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, settings=settings) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _insert_arrow) + return result + + async def create_insert_context(self, + table: str, + column_names: Optional[Union[str, Sequence[str]]] = None, + database: Optional[str] = None, + column_types: Sequence[ClickHouseType] = None, + column_type_names: Sequence[str] = None, + column_oriented: bool = False, + settings: Optional[Dict[str, Any]] = None, + data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext: + """ + Builds a reusable insert context to hold state for a duration of an insert + :param table: Target table + :param database: Target database. If not set, uses the client default database + :param column_names: Optional ordered list of column names. If not set, all columns ('*') will be assumed + in the order specified by the table definition + :param database: Target database -- will use client default database if not specified + :param column_types: ClickHouse column types. Optional Sequence of ClickHouseType objects. If neither column + types nor column type names are set, actual column types will be retrieved from the server. + :param column_type_names: ClickHouse column type names. Specified column types by name string + :param column_oriented: If true the data is already "pivoted" in column form + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param data: Initial dataset for insert + :return Reusable insert context + """ + + def _create_insert_context(): + return self.client.create_insert_context(table=table, column_names=column_names, database=database, + column_types=column_types, column_type_names=column_type_names, + column_oriented=column_oriented, settings=settings, data=data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _create_insert_context) + return result + + async def data_insert(self, context: InsertContext) -> QuerySummary: + """ + Subclass implementation of the data insert + :context: InsertContext parameter object + :return: No return, throws an exception if the insert fails + """ + + def _data_insert(): + return self.client.data_insert(context=context) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _data_insert) + return result + + async def raw_insert(self, table: str, + column_names: Optional[Sequence[str]] = None, + insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, + settings: Optional[Dict] = None, + fmt: Optional[str] = None, + compression: Optional[str] = None) -> QuerySummary: + """ + Insert data already formatted in a bytes object + :param table: Table name (whether qualified with the database name or not) + :param column_names: Sequence of column names + :param insert_block: Binary or string data already in a recognized ClickHouse format + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param compression: Recognized ClickHouse `Accept-Encoding` header compression value + :param fmt: Valid clickhouse format + """ + + def _raw_insert(): + return self.client.raw_insert(table=table, column_names=column_names, insert_block=insert_block, + settings=settings, fmt=fmt, compression=compression) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _raw_insert) + return result diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py index 03b5bbe324..bf3c0d863f 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py @@ -69,7 +69,8 @@ class HttpClient(Client): https_proxy: Optional[str] = None, server_host_name: Optional[str] = None, apply_server_timezone: Optional[Union[str, bool]] = None, - show_clickhouse_errors: Optional[bool] = None): + show_clickhouse_errors: Optional[bool] = None, + autogenerate_session_id: Optional[bool] = None): """ Create an HTTP ClickHouse Connect client See clickhouse_connect.get_client for parameters @@ -93,7 +94,7 @@ class HttpClient(Client): # pylint: disable=too-many-boolean-expressions if not self.http and (server_host_name or ca_cert or client_cert or not verify or https_proxy): options = {'verify': verify is not False} - dict_add(options,'ca_cert', ca_cert) + dict_add(options, 'ca_cert', ca_cert) dict_add(options, 'client_cert', client_cert) dict_add(options, 'client_cert_key', client_cert_key) if server_host_name: @@ -128,9 +129,14 @@ class HttpClient(Client): self._progress_interval = None self._active_session = None + # allow to override the global autogenerate_session_id setting via the constructor params + _autogenerate_session_id = common.get_setting('autogenerate_session_id') \ + if autogenerate_session_id is None \ + else autogenerate_session_id + if session_id: ch_settings['session_id'] = session_id - elif 'session_id' not in ch_settings and common.get_setting('autogenerate_session_id'): + elif 'session_id' not in ch_settings and _autogenerate_session_id: ch_settings['session_id'] = str(uuid.uuid4()) if coerce_bool(compress): |