aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-10-22 08:38:04 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-10-22 08:48:00 +0300
commitab5c8d6f28e47c9330ecc18cd748cf513fd243af (patch)
tree5086a58965aa21653226dc4f8d6a5bbdae560ff4 /contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
parentd9a31c562042fb72257b4050526a6d4d5415d815 (diff)
downloadydb-ab5c8d6f28e47c9330ecc18cd748cf513fd243af.tar.gz
Intermediate changes
commit_hash:36f6f3674eaf6f39f4347c79c0086dbcb43d6f27
Diffstat (limited to 'contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py')
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py47
1 files changed, 26 insertions, 21 deletions
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
index bf56c9783f..b63a14f776 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
@@ -1,5 +1,7 @@
import asyncio
import io
+import os
+from concurrent.futures.thread import ThreadPoolExecutor
from datetime import tzinfo
from typing import Optional, Union, Dict, Any, Sequence, Iterable, Generator, BinaryIO
@@ -20,10 +22,13 @@ class AsyncClient:
Internally, each of the methods that uses IO is wrapped in a call to EventLoop.run_in_executor.
"""
- def __init__(self, *, client: Client):
+ def __init__(self, *, client: Client, executor_threads: int = 0):
if isinstance(client, HttpClient):
client.headers['User-Agent'] = client.headers['User-Agent'].replace('mode:sync;', 'mode:async;')
self.client = client
+ if executor_threads == 0:
+ executor_threads = min(32, (os.cpu_count() or 1) + 4) # Mimic the default behavior
+ self.executor = ThreadPoolExecutor(max_workers=executor_threads)
def set_client_setting(self, key, value):
@@ -88,7 +93,7 @@ class AsyncClient:
external_data=external_data)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _query)
+ result = await loop.run_in_executor(self.executor, _query)
return result
async def query_column_block_stream(self,
@@ -117,7 +122,7 @@ class AsyncClient:
external_data=external_data)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _query_column_block_stream)
+ result = await loop.run_in_executor(self.executor, _query_column_block_stream)
return result
async def query_row_block_stream(self,
@@ -146,7 +151,7 @@ class AsyncClient:
external_data=external_data)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _query_row_block_stream)
+ result = await loop.run_in_executor(self.executor, _query_row_block_stream)
return result
async def query_rows_stream(self,
@@ -175,7 +180,7 @@ class AsyncClient:
external_data=external_data)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _query_rows_stream)
+ result = await loop.run_in_executor(self.executor, _query_rows_stream)
return result
async def raw_query(self,
@@ -202,7 +207,7 @@ class AsyncClient:
use_database=use_database, external_data=external_data)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _raw_query)
+ result = await loop.run_in_executor(self.executor, _raw_query)
return result
async def raw_stream(self, query: str,
@@ -228,7 +233,7 @@ class AsyncClient:
use_database=use_database, external_data=external_data)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _raw_stream)
+ result = await loop.run_in_executor(self.executor, _raw_stream)
return result
async def query_np(self,
@@ -255,7 +260,7 @@ class AsyncClient:
external_data=external_data)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _query_np)
+ result = await loop.run_in_executor(self.executor, _query_np)
return result
async def query_np_stream(self,
@@ -282,7 +287,7 @@ class AsyncClient:
context=context, external_data=external_data)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _query_np_stream)
+ result = await loop.run_in_executor(self.executor, _query_np_stream)
return result
async def query_df(self,
@@ -314,7 +319,7 @@ class AsyncClient:
external_data=external_data, use_extended_dtypes=use_extended_dtypes)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _query_df)
+ result = await loop.run_in_executor(self.executor, _query_df)
return result
async def query_df_stream(self,
@@ -347,7 +352,7 @@ class AsyncClient:
external_data=external_data, use_extended_dtypes=use_extended_dtypes)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _query_df_stream)
+ result = await loop.run_in_executor(self.executor, _query_df_stream)
return result
def create_query_context(self,
@@ -433,7 +438,7 @@ class AsyncClient:
use_strings=use_strings, external_data=external_data)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _query_arrow)
+ result = await loop.run_in_executor(self.executor, _query_arrow)
return result
async def query_arrow_stream(self,
@@ -457,7 +462,7 @@ class AsyncClient:
use_strings=use_strings, external_data=external_data)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _query_arrow_stream)
+ result = await loop.run_in_executor(self.executor, _query_arrow_stream)
return result
async def command(self,
@@ -486,7 +491,7 @@ class AsyncClient:
use_database=use_database, external_data=external_data)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _command)
+ result = await loop.run_in_executor(self.executor, _command)
return result
async def ping(self) -> bool:
@@ -499,7 +504,7 @@ class AsyncClient:
return self.client.ping()
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _ping)
+ result = await loop.run_in_executor(self.executor, _ping)
return result
async def insert(self,
@@ -537,7 +542,7 @@ class AsyncClient:
column_oriented=column_oriented, settings=settings, context=context)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _insert)
+ result = await loop.run_in_executor(self.executor, _insert)
return result
async def insert_df(self, table: str = None,
@@ -572,7 +577,7 @@ class AsyncClient:
context=context)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _insert_df)
+ result = await loop.run_in_executor(self.executor, _insert_df)
return result
async def insert_arrow(self, table: str,
@@ -591,7 +596,7 @@ class AsyncClient:
return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, settings=settings)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _insert_arrow)
+ result = await loop.run_in_executor(self.executor, _insert_arrow)
return result
async def create_insert_context(self,
@@ -625,7 +630,7 @@ class AsyncClient:
column_oriented=column_oriented, settings=settings, data=data)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _create_insert_context)
+ result = await loop.run_in_executor(self.executor, _create_insert_context)
return result
async def data_insert(self, context: InsertContext) -> QuerySummary:
@@ -639,7 +644,7 @@ class AsyncClient:
return self.client.data_insert(context=context)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _data_insert)
+ result = await loop.run_in_executor(self.executor, _data_insert)
return result
async def raw_insert(self, table: str,
@@ -663,5 +668,5 @@ class AsyncClient:
settings=settings, fmt=fmt, compression=compression)
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(None, _raw_insert)
+ result = await loop.run_in_executor(self.executor, _raw_insert)
return result