diff options
author | robot-piglet <[email protected]> | 2025-08-01 00:01:09 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-08-01 00:11:46 +0300 |
commit | 75fd1fc757cc04e434a65784ae4ba6e28350878d (patch) | |
tree | def4a4c6e8a93c0f37b563a6bb86bc7936fc3912 /contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py | |
parent | f5d4ccd1e8d8054636ee31f953767a529801fcbf (diff) |
Intermediate changes
commit_hash:11a36b37f1d393ab351897e8a0b5bf4de5871fe0
Diffstat (limited to 'contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py')
-rw-r--r-- | contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py | 137 |
1 files changed, 96 insertions, 41 deletions
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py index b63a14f7761..4bb9f080948 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py @@ -30,7 +30,6 @@ class AsyncClient: executor_threads = min(32, (os.cpu_count() or 1) + 4) # Mimic the default behavior self.executor = ThreadPoolExecutor(max_workers=executor_threads) - def set_client_setting(self, key, value): """ Set a clickhouse setting for the client after initialization. If a setting is not recognized by ClickHouse, @@ -48,6 +47,13 @@ class AsyncClient: """ return self.client.get_client_setting(key=key) + def set_access_token(self, access_token: str): + """ + Set the ClickHouse access token for the client + :param access_token: Access token string + """ + self.client.set_access_token(access_token) + def min_version(self, version_str: str) -> bool: """ Determine whether the connected server is at least the submitted version @@ -58,11 +64,12 @@ class AsyncClient: """ return self.client.min_version(version_str) - def close(self): + async def close(self): """ Subclass implementation to close the connection to the server/deallocate the client """ self.client.close() + await asyncio.to_thread(self.executor.shutdown, True) async def query(self, query: Optional[str] = None, @@ -78,7 +85,8 @@ class AsyncClient: context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> QueryResult: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QueryResult: """ Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. For parameters, see the create_query_context method. @@ -90,7 +98,7 @@ class AsyncClient: column_formats=column_formats, encoding=encoding, use_none=use_none, column_oriented=column_oriented, use_numpy=use_numpy, max_str_len=max_str_len, context=context, query_tz=query_tz, column_tzs=column_tzs, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query) @@ -107,7 +115,9 @@ class AsyncClient: context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None, + ) -> StreamContext: """ Variation of main query method that returns a stream of column oriented blocks. For parameters, see the create_query_context method. @@ -119,7 +129,7 @@ class AsyncClient: query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, context=context, query_tz=query_tz, column_tzs=column_tzs, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_column_block_stream) @@ -136,7 +146,8 @@ class AsyncClient: context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of row oriented blocks. For parameters, see the create_query_context method. @@ -148,7 +159,7 @@ class AsyncClient: query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, context=context, query_tz=query_tz, column_tzs=column_tzs, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_row_block_stream) @@ -165,7 +176,8 @@ class AsyncClient: context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of row oriented blocks. For parameters, see the create_query_context method. @@ -177,7 +189,7 @@ class AsyncClient: query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, context=context, query_tz=query_tz, column_tzs=column_tzs, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_rows_stream) @@ -189,7 +201,8 @@ class AsyncClient: settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> bytes: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> bytes: """ Query method that simply returns the raw ClickHouse format bytes. :param query: Query statement/format string @@ -199,12 +212,14 @@ class AsyncClient: :param use_database Send the database parameter to ClickHouse so the command will be executed in the client database context :param external_data External data to send with the query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: bytes representing raw ClickHouse return value based on format """ def _raw_query(): return self.client.raw_query(query=query, parameters=parameters, settings=settings, fmt=fmt, - use_database=use_database, external_data=external_data) + use_database=use_database, external_data=external_data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _raw_query) @@ -215,7 +230,8 @@ class AsyncClient: settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> io.IOBase: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> io.IOBase: """ Query method that returns the result as an io.IOBase iterator. :param query: Query statement/format string @@ -225,12 +241,13 @@ class AsyncClient: :param use_database Send the database parameter to ClickHouse so the command will be executed in the client database context :param external_data External data to send with the query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: io.IOBase stream/iterator for the result """ def _raw_stream(): return self.client.raw_stream(query=query, parameters=parameters, settings=settings, fmt=fmt, - use_database=use_database, external_data=external_data) + use_database=use_database, external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _raw_stream) @@ -246,7 +263,8 @@ class AsyncClient: use_none: Optional[bool] = None, max_str_len: Optional[int] = None, context: QueryContext = None, - external_data: Optional[ExternalData] = None): + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Query method that returns the results as a numpy array. For parameter values, see the create_query_context method. @@ -257,7 +275,7 @@ class AsyncClient: return self.client.query_np(query=query, parameters=parameters, settings=settings, query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, max_str_len=max_str_len, context=context, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_np) @@ -273,7 +291,8 @@ class AsyncClient: use_none: Optional[bool] = None, max_str_len: Optional[int] = None, context: QueryContext = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a stream of numpy arrays. For parameter values, see the create_query_context method. @@ -284,7 +303,7 @@ class AsyncClient: return self.client.query_np_stream(query=query, parameters=parameters, settings=settings, query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, max_str_len=max_str_len, - context=context, external_data=external_data) + context=context, external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_np_stream) @@ -304,7 +323,8 @@ class AsyncClient: column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, context: QueryContext = None, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None): + use_extended_dtypes: Optional[bool] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Query method that results the results as a pandas dataframe. For parameter values, see the create_query_context method. @@ -316,7 +336,8 @@ class AsyncClient: query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values, query_tz=query_tz, column_tzs=column_tzs, context=context, - external_data=external_data, use_extended_dtypes=use_extended_dtypes) + external_data=external_data, use_extended_dtypes=use_extended_dtypes, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_df) @@ -336,7 +357,8 @@ class AsyncClient: column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, context: QueryContext = None, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None) -> StreamContext: + use_extended_dtypes: Optional[bool] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a StreamContext. For parameter values, see the create_query_context method. @@ -349,7 +371,8 @@ class AsyncClient: encoding=encoding, use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values, query_tz=query_tz, column_tzs=column_tzs, context=context, - external_data=external_data, use_extended_dtypes=use_extended_dtypes) + external_data=external_data, use_extended_dtypes=use_extended_dtypes, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_df_stream) @@ -373,7 +396,8 @@ class AsyncClient: streaming: bool = False, as_pandas: bool = False, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None) -> QueryContext: + use_extended_dtypes: Optional[bool] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QueryContext: """ Creates or updates a reusable QueryContext object :param query: Query statement/format string @@ -403,6 +427,7 @@ class AsyncClient: :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray and StringArray. Defaulted to True for query_df methods + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Reusable QueryContext """ @@ -415,14 +440,16 @@ class AsyncClient: use_na_values=use_na_values, streaming=streaming, as_pandas=as_pandas, external_data=external_data, - use_extended_dtypes=use_extended_dtypes) + use_extended_dtypes=use_extended_dtypes, + transport_settings=transport_settings) async def query_arrow(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, settings: Optional[Dict[str, Any]] = None, use_strings: Optional[bool] = None, - external_data: Optional[ExternalData] = None): + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Query method using the ClickHouse Arrow format to return a PyArrow table :param query: Query statement/format string @@ -430,12 +457,14 @@ class AsyncClient: :param settings: Optional dictionary of ClickHouse settings (key/string values) :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) :param external_data ClickHouse "external data" to send with query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: PyArrow.Table """ def _query_arrow(): return self.client.query_arrow(query=query, parameters=parameters, settings=settings, - use_strings=use_strings, external_data=external_data) + use_strings=use_strings, external_data=external_data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_arrow) @@ -446,7 +475,8 @@ class AsyncClient: parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, settings: Optional[Dict[str, Any]] = None, use_strings: Optional[bool] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a stream of Arrow tables :param query: Query statement/format string @@ -454,12 +484,14 @@ class AsyncClient: :param settings: Optional dictionary of ClickHouse settings (key/string values) :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) :param external_data ClickHouse "external data" to send with query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Generator that yields a PyArrow.Table for per block representing the result set """ def _query_arrow_stream(): return self.client.query_arrow_stream(query=query, parameters=parameters, settings=settings, - use_strings=use_strings, external_data=external_data) + use_strings=use_strings, external_data=external_data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_arrow_stream) @@ -471,7 +503,8 @@ class AsyncClient: data: Union[str, bytes] = None, settings: Dict[str, Any] = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]: """ Client method that returns a single value instead of a result set :param cmd: ClickHouse query/command as a python format string @@ -482,13 +515,15 @@ class AsyncClient: database context. Otherwise, no database will be specified with the command. This is useful for determining the default user database :param external_data ClickHouse "external data" to send with command/query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary if no data returned """ def _command(): return self.client.command(cmd=cmd, parameters=parameters, data=data, settings=settings, - use_database=use_database, external_data=external_data) + use_database=use_database, external_data=external_data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _command) @@ -516,7 +551,8 @@ class AsyncClient: column_type_names: Sequence[str] = None, column_oriented: bool = False, settings: Optional[Dict[str, Any]] = None, - context: InsertContext = None) -> QuerySummary: + context: InsertContext = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments other than data are ignored @@ -533,13 +569,15 @@ class AsyncClient: :param settings: Optional dictionary of ClickHouse settings (key/string values) :param context: Optional reusable insert context to allow repeated inserts into the same table with different data batches + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ def _insert(): return self.client.insert(table=table, data=data, column_names=column_names, database=database, column_types=column_types, column_type_names=column_type_names, - column_oriented=column_oriented, settings=settings, context=context) + column_oriented=column_oriented, settings=settings, context=context, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _insert) @@ -552,7 +590,8 @@ class AsyncClient: column_names: Optional[Sequence[str]] = None, column_types: Sequence[ClickHouseType] = None, column_type_names: Sequence[str] = None, - context: InsertContext = None) -> QuerySummary: + context: InsertContext = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored :param table: ClickHouse table @@ -567,6 +606,7 @@ class AsyncClient: retrieved from the server :param context: Optional reusable insert context to allow repeated inserts into the same table with different data batches + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ @@ -574,7 +614,7 @@ class AsyncClient: return self.client.insert_df(table=table, df=df, database=database, settings=settings, column_names=column_names, column_types=column_types, column_type_names=column_type_names, - context=context) + context=context, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _insert_df) @@ -582,18 +622,21 @@ class AsyncClient: async def insert_arrow(self, table: str, arrow_table, database: str = None, - settings: Optional[Dict] = None) -> QuerySummary: + settings: Optional[Dict] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format :param table: ClickHouse table :param arrow_table: PyArrow Table object :param database: Optional ClickHouse database :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ def _insert_arrow(): - return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, settings=settings) + return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, + settings=settings, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _insert_arrow) @@ -607,7 +650,8 @@ class AsyncClient: column_type_names: Sequence[str] = None, column_oriented: bool = False, settings: Optional[Dict[str, Any]] = None, - data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext: + data: Optional[Sequence[Sequence[Any]]] = None, + transport_settings: Optional[Dict[str, str]] = None) -> InsertContext: """ Builds a reusable insert context to hold state for a duration of an insert :param table: Target table @@ -621,13 +665,15 @@ class AsyncClient: :param column_oriented: If true the data is already "pivoted" in column form :param settings: Optional dictionary of ClickHouse settings (key/string values) :param data: Initial dataset for insert - :return Reusable insert context + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) + :return: Reusable insert context """ def _create_insert_context(): return self.client.create_insert_context(table=table, column_names=column_names, database=database, column_types=column_types, column_type_names=column_type_names, - column_oriented=column_oriented, settings=settings, data=data) + column_oriented=column_oriented, settings=settings, data=data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _create_insert_context) @@ -652,7 +698,8 @@ class AsyncClient: insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, settings: Optional[Dict] = None, fmt: Optional[str] = None, - compression: Optional[str] = None) -> QuerySummary: + compression: Optional[str] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert data already formatted in a bytes object :param table: Table name (whether qualified with the database name or not) @@ -660,13 +707,21 @@ class AsyncClient: :param insert_block: Binary or string data already in a recognized ClickHouse format :param settings: Optional dictionary of ClickHouse settings (key/string values) :param compression: Recognized ClickHouse `Accept-Encoding` header compression value + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :param fmt: Valid clickhouse format """ def _raw_insert(): return self.client.raw_insert(table=table, column_names=column_names, insert_block=insert_block, - settings=settings, fmt=fmt, compression=compression) + settings=settings, fmt=fmt, compression=compression, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _raw_insert) return result + + async def __aenter__(self) -> "AsyncClient": + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self.close() |