diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-10-22 08:38:04 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-10-22 08:48:00 +0300 |
commit | ab5c8d6f28e47c9330ecc18cd748cf513fd243af (patch) | |
tree | 5086a58965aa21653226dc4f8d6a5bbdae560ff4 /contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py | |
parent | d9a31c562042fb72257b4050526a6d4d5415d815 (diff) | |
download | ydb-ab5c8d6f28e47c9330ecc18cd748cf513fd243af.tar.gz |
Intermediate changes
commit_hash:36f6f3674eaf6f39f4347c79c0086dbcb43d6f27
Diffstat (limited to 'contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py')
-rw-r--r-- | contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py | 47 |
1 files changed, 26 insertions, 21 deletions
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py index bf56c9783f..b63a14f776 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py @@ -1,5 +1,7 @@ import asyncio import io +import os +from concurrent.futures.thread import ThreadPoolExecutor from datetime import tzinfo from typing import Optional, Union, Dict, Any, Sequence, Iterable, Generator, BinaryIO @@ -20,10 +22,13 @@ class AsyncClient: Internally, each of the methods that uses IO is wrapped in a call to EventLoop.run_in_executor. """ - def __init__(self, *, client: Client): + def __init__(self, *, client: Client, executor_threads: int = 0): if isinstance(client, HttpClient): client.headers['User-Agent'] = client.headers['User-Agent'].replace('mode:sync;', 'mode:async;') self.client = client + if executor_threads == 0: + executor_threads = min(32, (os.cpu_count() or 1) + 4) # Mimic the default behavior + self.executor = ThreadPoolExecutor(max_workers=executor_threads) def set_client_setting(self, key, value): @@ -88,7 +93,7 @@ class AsyncClient: external_data=external_data) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _query) + result = await loop.run_in_executor(self.executor, _query) return result async def query_column_block_stream(self, @@ -117,7 +122,7 @@ class AsyncClient: external_data=external_data) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _query_column_block_stream) + result = await loop.run_in_executor(self.executor, _query_column_block_stream) return result async def query_row_block_stream(self, @@ -146,7 +151,7 @@ class AsyncClient: external_data=external_data) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _query_row_block_stream) + result = await loop.run_in_executor(self.executor, _query_row_block_stream) return result async def query_rows_stream(self, @@ -175,7 +180,7 @@ class AsyncClient: external_data=external_data) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _query_rows_stream) + result = await loop.run_in_executor(self.executor, _query_rows_stream) return result async def raw_query(self, @@ -202,7 +207,7 @@ class AsyncClient: use_database=use_database, external_data=external_data) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _raw_query) + result = await loop.run_in_executor(self.executor, _raw_query) return result async def raw_stream(self, query: str, @@ -228,7 +233,7 @@ class AsyncClient: use_database=use_database, external_data=external_data) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _raw_stream) + result = await loop.run_in_executor(self.executor, _raw_stream) return result async def query_np(self, @@ -255,7 +260,7 @@ class AsyncClient: external_data=external_data) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _query_np) + result = await loop.run_in_executor(self.executor, _query_np) return result async def query_np_stream(self, @@ -282,7 +287,7 @@ class AsyncClient: context=context, external_data=external_data) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _query_np_stream) + result = await loop.run_in_executor(self.executor, _query_np_stream) return result async def query_df(self, @@ -314,7 +319,7 @@ class AsyncClient: external_data=external_data, use_extended_dtypes=use_extended_dtypes) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _query_df) + result = await loop.run_in_executor(self.executor, _query_df) return result async def query_df_stream(self, @@ -347,7 +352,7 @@ class AsyncClient: external_data=external_data, use_extended_dtypes=use_extended_dtypes) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _query_df_stream) + result = await loop.run_in_executor(self.executor, _query_df_stream) return result def create_query_context(self, @@ -433,7 +438,7 @@ class AsyncClient: use_strings=use_strings, external_data=external_data) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _query_arrow) + result = await loop.run_in_executor(self.executor, _query_arrow) return result async def query_arrow_stream(self, @@ -457,7 +462,7 @@ class AsyncClient: use_strings=use_strings, external_data=external_data) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _query_arrow_stream) + result = await loop.run_in_executor(self.executor, _query_arrow_stream) return result async def command(self, @@ -486,7 +491,7 @@ class AsyncClient: use_database=use_database, external_data=external_data) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _command) + result = await loop.run_in_executor(self.executor, _command) return result async def ping(self) -> bool: @@ -499,7 +504,7 @@ class AsyncClient: return self.client.ping() loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _ping) + result = await loop.run_in_executor(self.executor, _ping) return result async def insert(self, @@ -537,7 +542,7 @@ class AsyncClient: column_oriented=column_oriented, settings=settings, context=context) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _insert) + result = await loop.run_in_executor(self.executor, _insert) return result async def insert_df(self, table: str = None, @@ -572,7 +577,7 @@ class AsyncClient: context=context) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _insert_df) + result = await loop.run_in_executor(self.executor, _insert_df) return result async def insert_arrow(self, table: str, @@ -591,7 +596,7 @@ class AsyncClient: return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, settings=settings) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _insert_arrow) + result = await loop.run_in_executor(self.executor, _insert_arrow) return result async def create_insert_context(self, @@ -625,7 +630,7 @@ class AsyncClient: column_oriented=column_oriented, settings=settings, data=data) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _create_insert_context) + result = await loop.run_in_executor(self.executor, _create_insert_context) return result async def data_insert(self, context: InsertContext) -> QuerySummary: @@ -639,7 +644,7 @@ class AsyncClient: return self.client.data_insert(context=context) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _data_insert) + result = await loop.run_in_executor(self.executor, _data_insert) return result async def raw_insert(self, table: str, @@ -663,5 +668,5 @@ class AsyncClient: settings=settings, fmt=fmt, compression=compression) loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, _raw_insert) + result = await loop.run_in_executor(self.executor, _raw_insert) return result |