aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/clickhouse-connect/clickhouse_connect
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-07-23 07:47:34 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-07-23 07:56:54 +0300
commite9497f65d8bbe478a583fa31b74e03a8d342ff5f (patch)
tree48793f79ab80ae9a82e77744ed89f82aeee8e59a /contrib/python/clickhouse-connect/clickhouse_connect
parent973312112362cf03ce0f7d4e5fdbcb63d1166670 (diff)
downloadydb-e9497f65d8bbe478a583fa31b74e03a8d342ff5f.tar.gz
Intermediate changes
Diffstat (limited to 'contrib/python/clickhouse-connect/clickhouse_connect')
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/__init__.py4
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/__version__.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py78
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py663
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py12
5 files changed, 753 insertions, 6 deletions
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py
index 81ee6dd3a0..737aed5b34 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py
@@ -1,6 +1,6 @@
-from clickhouse_connect.driver import create_client
-
+from clickhouse_connect.driver import create_client, create_async_client
driver_name = 'clickhousedb'
get_client = create_client
+get_async_client = create_async_client
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py
index 8dec87cb67..d9a123ac6c 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py
@@ -1 +1 @@
-version = '0.7.15'
+version = '0.7.16'
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py
index 2f0b11b132..27e16733d8 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py
@@ -1,3 +1,4 @@
+import asyncio
from inspect import signature
from typing import Optional, Union, Dict, Any
from urllib.parse import urlparse, parse_qs
@@ -7,6 +8,7 @@ from clickhouse_connect.driver.client import Client
from clickhouse_connect.driver.common import dict_copy
from clickhouse_connect.driver.exceptions import ProgrammingError
from clickhouse_connect.driver.httpclient import HttpClient
+from clickhouse_connect.driver.asyncclient import AsyncClient
# pylint: disable=too-many-arguments,too-many-locals,too-many-branches
@@ -68,6 +70,7 @@ def create_client(*,
:param server_host_name This is the server host name that will be checked against a TLS certificate for
validity. This option can be used if using an ssh_tunnel or other indirect means to an ClickHouse server
where the `host` argument refers to the tunnel or proxy and not the actual ClickHouse server
+ :param autogenerate_session_id If set, this will override the 'autogenerate_session_id' common setting.
:return: ClickHouse Connect Client instance
"""
if dsn:
@@ -117,3 +120,78 @@ def default_port(interface: str, secure: bool):
if interface.startswith('http'):
return 8443 if secure else 8123
raise ValueError('Unrecognized ClickHouse interface')
+
+
+async def create_async_client(*,
+ host: str = None,
+ username: str = None,
+ password: str = '',
+ database: str = '__default__',
+ interface: Optional[str] = None,
+ port: int = 0,
+ secure: Union[bool, str] = False,
+ dsn: Optional[str] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ generic_args: Optional[Dict[str, Any]] = None,
+ **kwargs) -> AsyncClient:
+ """
+ The preferred method to get an async ClickHouse Connect Client instance.
+ For sync version, see create_client.
+
+ Unlike sync version, the 'autogenerate_session_id' setting by default is False.
+
+ :param host: The hostname or IP address of the ClickHouse server. If not set, localhost will be used.
+ :param username: The ClickHouse username. If not set, the default ClickHouse user will be used.
+ :param password: The password for username.
+ :param database: The default database for the connection. If not set, ClickHouse Connect will use the
+ default database for username.
+ :param interface: Must be http or https. Defaults to http, or to https if port is set to 8443 or 443
+ :param port: The ClickHouse HTTP or HTTPS port. If not set will default to 8123, or to 8443 if secure=True
+ or interface=https.
+ :param secure: Use https/TLS. This overrides inferred values from the interface or port arguments.
+ :param dsn: A string in standard DSN (Data Source Name) format. Other connection values (such as host or user)
+ will be extracted from this string if not set otherwise.
+ :param settings: ClickHouse server settings to be used with the session/every request
+ :param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings.
+ It is not recommended to use this parameter externally
+ :param kwargs -- Recognized keyword arguments (used by the HTTP client), see below
+
+ :param compress: Enable compression for ClickHouse HTTP inserts and query results. True will select the preferred
+ compression method (lz4). A str of 'lz4', 'zstd', 'brotli', or 'gzip' can be used to use a specific compression type
+ :param query_limit: Default LIMIT on returned rows. 0 means no limit
+ :param connect_timeout: Timeout in seconds for the http connection
+ :param send_receive_timeout: Read timeout in seconds for http connection
+ :param client_name: client_name prepended to the HTTP User Agent header. Set this to track client queries
+ in the ClickHouse system.query_log.
+ :param send_progress: Deprecated, has no effect. Previous functionality is now automatically determined
+ :param verify: Verify the server certificate in secure/https mode
+ :param ca_cert: If verify is True, the file path to Certificate Authority root to validate ClickHouse server
+ certificate, in .pem format. Ignored if verify is False. This is not necessary if the ClickHouse server
+ certificate is trusted by the operating system. To trust the maintained list of "global" public root
+ certificates maintained by the Python 'certifi' package, set ca_cert to 'certifi'
+ :param client_cert: File path to a TLS Client certificate in .pem format. This file should contain any
+ applicable intermediate certificates
+ :param client_cert_key: File path to the private key for the Client Certificate. Required if the private key
+ is not included the Client Certificate key file
+ :param session_id ClickHouse session id. If not specified and the common setting 'autogenerate_session_id'
+ is True, the client will generate a UUID1 session id
+ :param pool_mgr Optional urllib3 PoolManager for this client. Useful for creating separate connection
+ pools for multiple client endpoints for applications with many clients
+ :param http_proxy http proxy address. Equivalent to setting the HTTP_PROXY environment variable
+ :param https_proxy https proxy address. Equivalent to setting the HTTPS_PROXY environment variable
+ :param server_host_name This is the server host name that will be checked against a TLS certificate for
+ validity. This option can be used if using an ssh_tunnel or other indirect means to an ClickHouse server
+ where the `host` argument refers to the tunnel or proxy and not the actual ClickHouse server
+ :param autogenerate_session_id If set, this will override the 'autogenerate_session_id' common setting.
+ :return: ClickHouse Connect Client instance
+ """
+
+ def _create_client():
+ if 'autogenerate_session_id' not in kwargs:
+ kwargs['autogenerate_session_id'] = False
+ return create_client(host=host, username=username, password=password, database=database, interface=interface,
+ port=port, secure=secure, dsn=dsn, settings=settings, generic_args=generic_args, **kwargs)
+
+ loop = asyncio.get_running_loop()
+ _client = await loop.run_in_executor(None, _create_client)
+ return AsyncClient(client=_client)
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
new file mode 100644
index 0000000000..1a4ee7e429
--- /dev/null
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
@@ -0,0 +1,663 @@
+import asyncio
+import io
+from datetime import tzinfo
+from typing import Optional, Union, Dict, Any, Sequence, Iterable, Generator, BinaryIO
+
+from clickhouse_connect.driver.client import Client
+from clickhouse_connect.driver.common import StreamContext
+from clickhouse_connect.driver.external import ExternalData
+from clickhouse_connect.driver.query import QueryContext, QueryResult
+from clickhouse_connect.driver.summary import QuerySummary
+from clickhouse_connect.datatypes.base import ClickHouseType
+from clickhouse_connect.driver.insert import InsertContext
+
+
+# pylint: disable=too-many-public-methods, too-many-instance-attributes, too-many-arguments, too-many-locals
+class AsyncClient:
+ """
+ AsyncClient is a wrapper around the ClickHouse Client object that allows for async calls to the ClickHouse server.
+ Internally, each of the methods that uses IO is wrapped in a call to EventLoop.run_in_executor.
+ """
+
+ def __init__(self, *, client: Client):
+ self.client = client
+
+ def set_client_setting(self, key, value):
+ """
+ Set a clickhouse setting for the client after initialization. If a setting is not recognized by ClickHouse,
+ or the setting is identified as "read_only", this call will either throw a Programming exception or attempt
+ to send the setting anyway based on the common setting 'invalid_setting_action'.
+ :param key: ClickHouse setting name
+ :param value: ClickHouse setting value
+ """
+ self.client.set_client_setting(key=key, value=value)
+
+ def get_client_setting(self, key) -> Optional[str]:
+ """
+ :param key: The setting key
+ :return: The string value of the setting, if it exists, or None
+ """
+ return self.client.get_client_setting(key=key)
+
+ def min_version(self, version_str: str) -> bool:
+ """
+ Determine whether the connected server is at least the submitted version
+ For Altinity Stable versions like 22.8.15.25.altinitystable
+ the last condition in the first list comprehension expression is added
+ :param version_str: A version string consisting of up to 4 integers delimited by dots
+ :return: True if version_str is greater than the server_version, False if less than
+ """
+ return self.client.min_version(version_str)
+
+ def close(self):
+ """
+ Subclass implementation to close the connection to the server/deallocate the client
+ """
+ self.client.close()
+
+ async def query(self,
+ query: Optional[str] = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ column_oriented: Optional[bool] = None,
+ use_numpy: Optional[bool] = None,
+ max_str_len: Optional[int] = None,
+ context: QueryContext = None,
+ query_tz: Optional[Union[str, tzinfo]] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ external_data: Optional[ExternalData] = None) -> QueryResult:
+ """
+ Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix.
+ For parameters, see the create_query_context method.
+ :return: QueryResult -- data and metadata from response
+ """
+
+ def _query():
+ return self.client.query(query=query, parameters=parameters, settings=settings, query_formats=query_formats,
+ column_formats=column_formats, encoding=encoding, use_none=use_none,
+ column_oriented=column_oriented, use_numpy=use_numpy, max_str_len=max_str_len,
+ context=context, query_tz=query_tz, column_tzs=column_tzs,
+ external_data=external_data)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _query)
+ return result
+
+ async def query_column_block_stream(self,
+ query: Optional[str] = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ context: QueryContext = None,
+ query_tz: Optional[Union[str, tzinfo]] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ external_data: Optional[ExternalData] = None) -> StreamContext:
+ """
+ Variation of main query method that returns a stream of column oriented blocks.
+ For parameters, see the create_query_context method.
+ :return: StreamContext -- Iterable stream context that returns column oriented blocks
+ """
+
+ def _query_column_block_stream():
+ return self.client.query_column_block_stream(query=query, parameters=parameters, settings=settings,
+ query_formats=query_formats, column_formats=column_formats,
+ encoding=encoding, use_none=use_none, context=context,
+ query_tz=query_tz, column_tzs=column_tzs,
+ external_data=external_data)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _query_column_block_stream)
+ return result
+
+ async def query_row_block_stream(self,
+ query: Optional[str] = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ context: QueryContext = None,
+ query_tz: Optional[Union[str, tzinfo]] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ external_data: Optional[ExternalData] = None) -> StreamContext:
+ """
+ Variation of main query method that returns a stream of row oriented blocks.
+ For parameters, see the create_query_context method.
+ :return: StreamContext -- Iterable stream context that returns blocks of rows
+ """
+
+ def _query_row_block_stream():
+ return self.client.query_row_block_stream(query=query, parameters=parameters, settings=settings,
+ query_formats=query_formats, column_formats=column_formats,
+ encoding=encoding, use_none=use_none, context=context,
+ query_tz=query_tz, column_tzs=column_tzs,
+ external_data=external_data)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _query_row_block_stream)
+ return result
+
+ async def query_rows_stream(self,
+ query: Optional[str] = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ context: QueryContext = None,
+ query_tz: Optional[Union[str, tzinfo]] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ external_data: Optional[ExternalData] = None) -> StreamContext:
+ """
+ Variation of main query method that returns a stream of row oriented blocks.
+ For parameters, see the create_query_context method.
+ :return: StreamContext -- Iterable stream context that returns blocks of rows
+ """
+
+ def _query_rows_stream():
+ return self.client.query_rows_stream(query=query, parameters=parameters, settings=settings,
+ query_formats=query_formats, column_formats=column_formats,
+ encoding=encoding, use_none=use_none, context=context,
+ query_tz=query_tz, column_tzs=column_tzs,
+ external_data=external_data)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _query_rows_stream)
+ return result
+
+ async def raw_query(self,
+ query: str,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ fmt: str = None,
+ use_database: bool = True,
+ external_data: Optional[ExternalData] = None) -> bytes:
+ """
+ Query method that simply returns the raw ClickHouse format bytes.
+ :param query: Query statement/format string
+ :param parameters: Optional dictionary used to format the query
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param fmt: ClickHouse output format
+ :param use_database Send the database parameter to ClickHouse so the command will be executed in the client
+ database context
+ :param external_data External data to send with the query
+ :return: bytes representing raw ClickHouse return value based on format
+ """
+
+ def _raw_query():
+ return self.client.raw_query(query=query, parameters=parameters, settings=settings, fmt=fmt,
+ use_database=use_database, external_data=external_data)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _raw_query)
+ return result
+
+ async def raw_stream(self, query: str,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ fmt: str = None,
+ use_database: bool = True,
+ external_data: Optional[ExternalData] = None) -> io.IOBase:
+ """
+ Query method that returns the result as an io.IOBase iterator.
+ :param query: Query statement/format string
+ :param parameters: Optional dictionary used to format the query
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param fmt: ClickHouse output format
+ :param use_database Send the database parameter to ClickHouse so the command will be executed in the client
+ database context
+ :param external_data External data to send with the query
+ :return: io.IOBase stream/iterator for the result
+ """
+
+ def _raw_stream():
+ return self.client.raw_stream(query=query, parameters=parameters, settings=settings, fmt=fmt,
+ use_database=use_database, external_data=external_data)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _raw_stream)
+ return result
+
+ async def query_np(self,
+ query: Optional[str] = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, str]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ max_str_len: Optional[int] = None,
+ context: QueryContext = None,
+ external_data: Optional[ExternalData] = None):
+ """
+ Query method that returns the results as a numpy array.
+ For parameter values, see the create_query_context method.
+ :return: Numpy array representing the result set
+ """
+
+ def _query_np():
+ return self.client.query_np(query=query, parameters=parameters, settings=settings,
+ query_formats=query_formats, column_formats=column_formats, encoding=encoding,
+ use_none=use_none, max_str_len=max_str_len, context=context,
+ external_data=external_data)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _query_np)
+ return result
+
+ async def query_np_stream(self,
+ query: Optional[str] = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, str]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ max_str_len: Optional[int] = None,
+ context: QueryContext = None,
+ external_data: Optional[ExternalData] = None) -> StreamContext:
+ """
+ Query method that returns the results as a stream of numpy arrays.
+ For parameter values, see the create_query_context method.
+ :return: Generator that yield a numpy array per block representing the result set
+ """
+
+ def _query_np_stream():
+ return self.client.query_np_stream(query=query, parameters=parameters, settings=settings,
+ query_formats=query_formats, column_formats=column_formats,
+ encoding=encoding, use_none=use_none, max_str_len=max_str_len,
+ context=context, external_data=external_data)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _query_np_stream)
+ return result
+
+ async def query_df(self,
+ query: Optional[str] = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, str]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ max_str_len: Optional[int] = None,
+ use_na_values: Optional[bool] = None,
+ query_tz: Optional[str] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ context: QueryContext = None,
+ external_data: Optional[ExternalData] = None,
+ use_extended_dtypes: Optional[bool] = None):
+ """
+ Query method that results the results as a pandas dataframe.
+ For parameter values, see the create_query_context method.
+ :return: Pandas dataframe representing the result set
+ """
+
+ def _query_df():
+ return self.client.query_df(query=query, parameters=parameters, settings=settings,
+ query_formats=query_formats, column_formats=column_formats, encoding=encoding,
+ use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values,
+ query_tz=query_tz, column_tzs=column_tzs, context=context,
+ external_data=external_data, use_extended_dtypes=use_extended_dtypes)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _query_df)
+ return result
+
+ async def query_df_stream(self,
+ query: Optional[str] = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, str]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ max_str_len: Optional[int] = None,
+ use_na_values: Optional[bool] = None,
+ query_tz: Optional[str] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ context: QueryContext = None,
+ external_data: Optional[ExternalData] = None,
+ use_extended_dtypes: Optional[bool] = None) -> StreamContext:
+ """
+ Query method that returns the results as a StreamContext.
+ For parameter values, see the create_query_context method.
+ :return: Generator that yields a Pandas dataframe per block representing the result set
+ """
+
+ def _query_df_stream():
+ return self.client.query_df_stream(query=query, parameters=parameters, settings=settings,
+ query_formats=query_formats, column_formats=column_formats,
+ encoding=encoding,
+ use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values,
+ query_tz=query_tz, column_tzs=column_tzs, context=context,
+ external_data=external_data, use_extended_dtypes=use_extended_dtypes)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _query_df_stream)
+ return result
+
+ def create_query_context(self,
+ query: Optional[Union[str, bytes]] = None,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ query_formats: Optional[Dict[str, str]] = None,
+ column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
+ encoding: Optional[str] = None,
+ use_none: Optional[bool] = None,
+ column_oriented: Optional[bool] = None,
+ use_numpy: Optional[bool] = False,
+ max_str_len: Optional[int] = 0,
+ context: Optional[QueryContext] = None,
+ query_tz: Optional[Union[str, tzinfo]] = None,
+ column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
+ use_na_values: Optional[bool] = None,
+ streaming: bool = False,
+ as_pandas: bool = False,
+ external_data: Optional[ExternalData] = None,
+ use_extended_dtypes: Optional[bool] = None) -> QueryContext:
+ """
+ Creates or updates a reusable QueryContext object
+ :param query: Query statement/format string
+ :param parameters: Optional dictionary used to format the query
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param query_formats: See QueryContext __init__ docstring
+ :param column_formats: See QueryContext __init__ docstring
+ :param encoding: See QueryContext __init__ docstring
+ :param use_none: Use None for ClickHouse NULL instead of default values. Note that using None in Numpy
+ arrays will force the numpy array dtype to 'object', which is often inefficient. This effect also
+ will impact the performance of Pandas dataframes.
+ :param column_oriented: Deprecated. Controls orientation of the QueryResult result_set property
+ :param use_numpy: Return QueryResult columns as one-dimensional numpy arrays
+ :param max_str_len: Limit returned ClickHouse String values to this length, which allows a Numpy
+ structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for
+ String columns will always be object arrays
+ :param context: An existing QueryContext to be updated with any provided parameter values
+ :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects).
+ Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime
+ objects with the selected timezone
+ :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to
+ tzinfo objects). The timezone will be applied to datetime objects returned in the query
+ :param use_na_values: Deprecated alias for use_advanced_dtypes
+ :param as_pandas Return the result columns as pandas.Series objects
+ :param streaming Marker used to correctly configure streaming queries
+ :param external_data ClickHouse "external data" to send with query
+ :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as
+ pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray
+ and StringArray. Defaulted to True for query_df methods
+ :return: Reusable QueryContext
+ """
+
+ return self.client.create_query_context(query=query, parameters=parameters, settings=settings,
+ query_formats=query_formats, column_formats=column_formats,
+ encoding=encoding, use_none=use_none,
+ column_oriented=column_oriented,
+ use_numpy=use_numpy, max_str_len=max_str_len, context=context,
+ query_tz=query_tz, column_tzs=column_tzs,
+ use_na_values=use_na_values,
+ streaming=streaming, as_pandas=as_pandas,
+ external_data=external_data,
+ use_extended_dtypes=use_extended_dtypes)
+
+ async def query_arrow(self,
+ query: str,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ use_strings: Optional[bool] = None,
+ external_data: Optional[ExternalData] = None):
+ """
+ Query method using the ClickHouse Arrow format to return a PyArrow table
+ :param query: Query statement/format string
+ :param parameters: Optional dictionary used to format the query
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary)
+ :param external_data ClickHouse "external data" to send with query
+ :return: PyArrow.Table
+ """
+
+ def _query_arrow():
+ return self.client.query_arrow(query=query, parameters=parameters, settings=settings,
+ use_strings=use_strings, external_data=external_data)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _query_arrow)
+ return result
+
+ async def query_arrow_stream(self,
+ query: str,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ settings: Optional[Dict[str, Any]] = None,
+ use_strings: Optional[bool] = None,
+ external_data: Optional[ExternalData] = None) -> StreamContext:
+ """
+ Query method that returns the results as a stream of Arrow tables
+ :param query: Query statement/format string
+ :param parameters: Optional dictionary used to format the query
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary)
+ :param external_data ClickHouse "external data" to send with query
+ :return: Generator that yields a PyArrow.Table for per block representing the result set
+ """
+
+ def _query_arrow_stream():
+ return self.client.query_arrow_stream(query=query, parameters=parameters, settings=settings,
+ use_strings=use_strings, external_data=external_data)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _query_arrow_stream)
+ return result
+
+ async def command(self,
+ cmd: str,
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
+ data: Union[str, bytes] = None,
+ settings: Dict[str, Any] = None,
+ use_database: bool = True,
+ external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]:
+ """
+ Client method that returns a single value instead of a result set
+ :param cmd: ClickHouse query/command as a python format string
+ :param parameters: Optional dictionary of key/values pairs to be formatted
+ :param data: Optional 'data' for the command (for INSERT INTO in particular)
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client
+ database context. Otherwise, no database will be specified with the command. This is useful for determining
+ the default user database
+ :param external_data ClickHouse "external data" to send with command/query
+ :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary
+ if no data returned
+ """
+
+ def _command():
+ return self.client.command(cmd=cmd, parameters=parameters, data=data, settings=settings,
+ use_database=use_database, external_data=external_data)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _command)
+ return result
+
+ async def ping(self) -> bool:
+ """
+ Validate the connection, does not throw an Exception (see debug logs)
+ :return: ClickHouse server is up and reachable
+ """
+
+ def _ping():
+ return self.client.ping()
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _ping)
+ return result
+
+ async def insert(self,
+ table: Optional[str] = None,
+ data: Sequence[Sequence[Any]] = None,
+ column_names: Union[str, Iterable[str]] = '*',
+ database: Optional[str] = None,
+ column_types: Sequence[ClickHouseType] = None,
+ column_type_names: Sequence[str] = None,
+ column_oriented: bool = False,
+ settings: Optional[Dict[str, Any]] = None,
+ context: InsertContext = None) -> QuerySummary:
+ """
+ Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments
+ other than data are ignored
+ :param table: Target table
+ :param data: Sequence of sequences of Python data
+ :param column_names: Ordered list of column names or '*' if column types should be retrieved from the
+ ClickHouse table definition
+ :param database: Target database -- will use client default database if not specified.
+ :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from
+ the server
+ :param column_type_names: ClickHouse column type names. If set then column data does not need to be
+ retrieved from the server
+ :param column_oriented: If true the data is already "pivoted" in column form
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param context: Optional reusable insert context to allow repeated inserts into the same table with
+ different data batches
+ :return: QuerySummary with summary information, throws exception if insert fails
+ """
+
+ def _insert():
+ return self.client.insert(table=table, data=data, column_names=column_names, database=database,
+ column_types=column_types, column_type_names=column_type_names,
+ column_oriented=column_oriented, settings=settings, context=context)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _insert)
+ return result
+
+ async def insert_df(self, table: str = None,
+ df=None,
+ database: Optional[str] = None,
+ settings: Optional[Dict] = None,
+ column_names: Optional[Sequence[str]] = None,
+ column_types: Sequence[ClickHouseType] = None,
+ column_type_names: Sequence[str] = None,
+ context: InsertContext = None) -> QuerySummary:
+ """
+ Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored
+ :param table: ClickHouse table
+ :param df: two-dimensional pandas dataframe
+ :param database: Optional ClickHouse database
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param column_names: An optional list of ClickHouse column names. If not set, the DataFrame column names
+ will be used
+ :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from
+ the server
+ :param column_type_names: ClickHouse column type names. If set then column data does not need to be
+ retrieved from the server
+ :param context: Optional reusable insert context to allow repeated inserts into the same table with
+ different data batches
+ :return: QuerySummary with summary information, throws exception if insert fails
+ """
+
+ def _insert_df():
+ return self.client.insert_df(table=table, df=df, database=database, settings=settings,
+ column_names=column_names,
+ column_types=column_types, column_type_names=column_type_names,
+ context=context)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _insert_df)
+ return result
+
+ async def insert_arrow(self, table: str,
+ arrow_table, database: str = None,
+ settings: Optional[Dict] = None) -> QuerySummary:
+ """
+ Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format
+ :param table: ClickHouse table
+ :param arrow_table: PyArrow Table object
+ :param database: Optional ClickHouse database
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :return: QuerySummary with summary information, throws exception if insert fails
+ """
+
+ def _insert_arrow():
+ return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, settings=settings)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _insert_arrow)
+ return result
+
+ async def create_insert_context(self,
+ table: str,
+ column_names: Optional[Union[str, Sequence[str]]] = None,
+ database: Optional[str] = None,
+ column_types: Sequence[ClickHouseType] = None,
+ column_type_names: Sequence[str] = None,
+ column_oriented: bool = False,
+ settings: Optional[Dict[str, Any]] = None,
+ data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext:
+ """
+ Builds a reusable insert context to hold state for a duration of an insert
+ :param table: Target table
+ :param database: Target database. If not set, uses the client default database
+ :param column_names: Optional ordered list of column names. If not set, all columns ('*') will be assumed
+ in the order specified by the table definition
+ :param database: Target database -- will use client default database if not specified
+ :param column_types: ClickHouse column types. Optional Sequence of ClickHouseType objects. If neither column
+ types nor column type names are set, actual column types will be retrieved from the server.
+ :param column_type_names: ClickHouse column type names. Specified column types by name string
+ :param column_oriented: If true the data is already "pivoted" in column form
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param data: Initial dataset for insert
+ :return Reusable insert context
+ """
+
+ def _create_insert_context():
+ return self.client.create_insert_context(table=table, column_names=column_names, database=database,
+ column_types=column_types, column_type_names=column_type_names,
+ column_oriented=column_oriented, settings=settings, data=data)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _create_insert_context)
+ return result
+
+ async def data_insert(self, context: InsertContext) -> QuerySummary:
+ """
+ Subclass implementation of the data insert
+ :context: InsertContext parameter object
+ :return: No return, throws an exception if the insert fails
+ """
+
+ def _data_insert():
+ return self.client.data_insert(context=context)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _data_insert)
+ return result
+
+ async def raw_insert(self, table: str,
+ column_names: Optional[Sequence[str]] = None,
+ insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None,
+ settings: Optional[Dict] = None,
+ fmt: Optional[str] = None,
+ compression: Optional[str] = None) -> QuerySummary:
+ """
+ Insert data already formatted in a bytes object
+ :param table: Table name (whether qualified with the database name or not)
+ :param column_names: Sequence of column names
+ :param insert_block: Binary or string data already in a recognized ClickHouse format
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param compression: Recognized ClickHouse `Accept-Encoding` header compression value
+ :param fmt: Valid clickhouse format
+ """
+
+ def _raw_insert():
+ return self.client.raw_insert(table=table, column_names=column_names, insert_block=insert_block,
+ settings=settings, fmt=fmt, compression=compression)
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, _raw_insert)
+ return result
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py
index 03b5bbe324..bf3c0d863f 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py
@@ -69,7 +69,8 @@ class HttpClient(Client):
https_proxy: Optional[str] = None,
server_host_name: Optional[str] = None,
apply_server_timezone: Optional[Union[str, bool]] = None,
- show_clickhouse_errors: Optional[bool] = None):
+ show_clickhouse_errors: Optional[bool] = None,
+ autogenerate_session_id: Optional[bool] = None):
"""
Create an HTTP ClickHouse Connect client
See clickhouse_connect.get_client for parameters
@@ -93,7 +94,7 @@ class HttpClient(Client):
# pylint: disable=too-many-boolean-expressions
if not self.http and (server_host_name or ca_cert or client_cert or not verify or https_proxy):
options = {'verify': verify is not False}
- dict_add(options,'ca_cert', ca_cert)
+ dict_add(options, 'ca_cert', ca_cert)
dict_add(options, 'client_cert', client_cert)
dict_add(options, 'client_cert_key', client_cert_key)
if server_host_name:
@@ -128,9 +129,14 @@ class HttpClient(Client):
self._progress_interval = None
self._active_session = None
+ # allow to override the global autogenerate_session_id setting via the constructor params
+ _autogenerate_session_id = common.get_setting('autogenerate_session_id') \
+ if autogenerate_session_id is None \
+ else autogenerate_session_id
+
if session_id:
ch_settings['session_id'] = session_id
- elif 'session_id' not in ch_settings and common.get_setting('autogenerate_session_id'):
+ elif 'session_id' not in ch_settings and _autogenerate_session_id:
ch_settings['session_id'] = str(uuid.uuid4())
if coerce_bool(compress):