summaryrefslogtreecommitdiffstats
path: root/contrib/python/clickhouse-connect
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2026-06-03 11:02:34 +0300
committerrobot-piglet <[email protected]>2026-06-03 11:26:06 +0300
commit23e2a014ee64590656fe587d35005c5aac02ccd8 (patch)
tree5c7bc29ac1266549dcf83076c5748646ece582da /contrib/python/clickhouse-connect
parent37780c25523e7295b8b4887fb28fbb05fbb8e37d (diff)
Intermediate changes
commit_hash:f088ee5140fe7cfb904b3d2eef65c385a73148fb
Diffstat (limited to 'contrib/python/clickhouse-connect')
-rw-r--r--contrib/python/clickhouse-connect/.dist-info/METADATA2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/_version.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/compiler.py24
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py178
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py2
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py9
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/streaming.py8
-rw-r--r--contrib/python/clickhouse-connect/clickhouse_connect/driver/tzutil.py49
-rw-r--r--contrib/python/clickhouse-connect/ya.make2
9 files changed, 231 insertions, 45 deletions
diff --git a/contrib/python/clickhouse-connect/.dist-info/METADATA b/contrib/python/clickhouse-connect/.dist-info/METADATA
index 4a4168b9536..3f24e9dbb73 100644
--- a/contrib/python/clickhouse-connect/.dist-info/METADATA
+++ b/contrib/python/clickhouse-connect/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.4
Name: clickhouse-connect
-Version: 1.0.0
+Version: 1.0.1
Summary: ClickHouse Database Core Driver for Python, Pandas, and Superset
Home-page: https://github.com/ClickHouse/clickhouse-connect
Author: ClickHouse Inc.
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/_version.py b/contrib/python/clickhouse-connect/clickhouse_connect/_version.py
index 11a716ec1fe..86774adcc16 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/_version.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/_version.py
@@ -1 +1 @@
-version = "1.0.0"
+version = "1.0.1"
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/compiler.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/compiler.py
index 9443cee553e..bbe7a4e9640 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/compiler.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/compiler.py
@@ -40,6 +40,10 @@ def _resolve_ch_type_name(sqla_type):
class ChStatementCompiler(SQLCompiler):
+ def _raise_on_escape(self, binary, operator_name: str):
+ if binary.modifiers.get("escape") is not None:
+ raise CompileError(f"ClickHouse does not support the ESCAPE clause on {operator_name}")
+
def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
table = delete_stmt.table
text = f"DELETE FROM {format_table(table)}"
@@ -261,3 +265,23 @@ class ChStatementCompiler(SQLCompiler):
if mods and alias in mods:
result += " " + mods[alias]
return result
+
+ def visit_like_op_binary(self, binary, operator, **kw):
+ self._raise_on_escape(binary, "LIKE")
+ return super().visit_like_op_binary(binary, operator, **kw)
+
+ def visit_not_like_op_binary(self, binary, operator, **kw):
+ self._raise_on_escape(binary, "LIKE")
+ return super().visit_not_like_op_binary(binary, operator, **kw)
+
+ def visit_ilike_op_binary(self, binary, operator, **kw):
+ self._raise_on_escape(binary, "ILIKE")
+ left = self.process(binary.left, **kw)
+ right = self.process(binary.right, **kw)
+ return f"{left} ILIKE {right}"
+
+ def visit_not_ilike_op_binary(self, binary, operator, **kw):
+ self._raise_on_escape(binary, "ILIKE")
+ left = self.process(binary.left, **kw)
+ right = self.process(binary.right, **kw)
+ return f"{left} NOT ILIKE {right}"
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
index 7e8c80187e7..973018cc521 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py
@@ -98,6 +98,53 @@ class BytesSource:
"""No-op close method for compatibility."""
+class _SessionLease:
+ """An aiohttp.ClientSession with an in-flight request count, so close()
+ can wait for outstanding requests to drain before tearing down the session."""
+
+ __slots__ = ("session", "_inflight", "_drained")
+
+ def __init__(self, session: aiohttp.ClientSession):
+ self.session = session
+ self._inflight = 0
+ self._drained = asyncio.Event()
+ self._drained.set()
+
+ def acquire(self) -> None:
+ self._inflight += 1
+ if self._inflight == 1:
+ self._drained.clear()
+
+ def release(self) -> None:
+ self._inflight -= 1
+ if self._inflight == 0:
+ self._drained.set()
+
+ async def wait_drained(self) -> None:
+ await self._drained.wait()
+
+
+def _one_shot(fn: Callable[[], None]) -> Callable[[], None]:
+ """Returns a wrapper that invokes fn at most once."""
+ fired = False
+
+ def call():
+ nonlocal fired
+ if not fired:
+ fired = True
+ fn()
+
+ return call
+
+
+def _release_lease(response: aiohttp.ClientResponse | None) -> None:
+ if response is None:
+ return
+ release = getattr(response, "_lease_release", None)
+ if release is not None:
+ release()
+
+
class AsyncClient(Client):
valid_transport_settings = {
"database",
@@ -231,7 +278,7 @@ class AsyncClient(Client):
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
elif ca_cert:
- ssl_context.load_verify_locations(ca_cert)
+ ssl_context.load_verify_locations(httputil.resolve_ca_cert(ca_cert))
if client_cert:
ssl_context.load_cert_chain(client_cert, client_cert_key)
@@ -250,7 +297,8 @@ class AsyncClient(Client):
if sys.version_info < (3, 12, 7) or sys.version_info[:3] == (3, 13, 0):
self._connector_kwargs["enable_cleanup_closed"] = True
- self._session = None
+ self._session_lease: _SessionLease | None = None
+ self._session_lock = asyncio.Lock()
self._read_format = "Native"
self._write_format = "Native"
self._transform = NativeTransform()
@@ -284,6 +332,15 @@ class AsyncClient(Client):
autoconnect=False,
)
+ @property
+ def _session(self) -> aiohttp.ClientSession | None:
+ lease = self._session_lease
+ return lease.session if lease is not None else None
+
+ @_session.setter
+ def _session(self, value: aiohttp.ClientSession | None) -> None:
+ self._session_lease = _SessionLease(value) if value is not None else None
+
async def _initialize(self):
"""
Async equivalent of Client._init_common_settings.
@@ -311,7 +368,7 @@ class AsyncClient(Client):
self.server_version, server_tz_str = tuple(row)
try:
server_tz = tzutil.resolve_zone(server_tz_str)
- server_tz, self._dst_safe = tzutil.normalize_timezone(server_tz)
+ server_tz, self._dst_safe = tzutil.normalize_timezone(server_tz, trust_fixed_offset=True)
self.server_tz = server_tz
except zoneinfo.ZoneInfoNotFoundError:
logger.warning(
@@ -420,22 +477,31 @@ class AsyncClient(Client):
return False
async def close(self): # type: ignore[override]
- if self._session:
- await self._session.close()
+ async with self._session_lock:
+ old_lease = self._session_lease
+ self._session_lease = None
+ if old_lease is not None:
+ await old_lease.wait_drained()
+ await old_lease.session.close()
async def close_connections(self): # type: ignore[override]
- """Close all pooled connections and recreate session"""
- if self._session:
- await self._session.close()
- connector = aiohttp.TCPConnector(**self._connector_kwargs)
- self._session = aiohttp.ClientSession(
- connector=connector,
- timeout=self._timeout,
- headers=self.headers,
- trust_env=False,
- auto_decompress=False,
- skip_auto_headers={"Accept-Encoding"},
- )
+ """Rotate the connection pool: new requests use a fresh session; in-flight
+ requests keep using the old session until they complete, then it's closed."""
+ async with self._session_lock:
+ old_lease = self._session_lease
+ connector = aiohttp.TCPConnector(**self._connector_kwargs)
+ new_session = aiohttp.ClientSession(
+ connector=connector,
+ timeout=self._timeout,
+ headers=self.headers,
+ trust_env=False,
+ auto_decompress=False,
+ skip_auto_headers={"Accept-Encoding"},
+ )
+ self._session_lease = _SessionLease(new_session)
+ if old_lease is not None:
+ await old_lease.wait_drained()
+ await old_lease.session.close()
def set_client_setting(self, key, value):
str_value = self._validate_setting(key, value, common.get_setting("invalid_setting_action"))
@@ -496,8 +562,11 @@ class AsyncClient(Client):
params.update(context.bind_params)
response = await self._raw_request(fmt_json_query, params, headers, retries=self.query_retries)
- body = await response.read()
- encoding = response.headers.get("Content-Encoding")
+ try:
+ body = await response.read()
+ encoding = response.headers.get("Content-Encoding")
+ finally:
+ _release_lease(response)
loop = asyncio.get_running_loop()
def decompress_and_parse_json():
@@ -874,9 +943,12 @@ class AsyncClient(Client):
headers = dict_copy(headers, transport_settings)
method = "POST" if payload or files else "GET"
response = await self._raw_request(payload, params, headers, files=files, method=method, server_wait=False)
- body = await response.read()
- encoding = response.headers.get("Content-Encoding")
- summary = self._summary(response)
+ try:
+ body = await response.read()
+ encoding = response.headers.get("Content-Encoding")
+ summary = self._summary(response)
+ finally:
+ _release_lease(response)
if not body:
return QuerySummary(summary)
@@ -902,14 +974,22 @@ class AsyncClient(Client):
return await loop.run_in_executor(None, decompress_and_decode)
async def ping(self) -> bool: # type: ignore[override]
+ async with self._session_lock:
+ lease = self._session_lease
+ if lease is None or lease.session.closed:
+ return False
+ session = lease.session
+ lease.acquire()
try:
url = f"{self.url}/ping"
timeout = aiohttp.ClientTimeout(total=3.0)
- async with self._session.get(url, timeout=timeout) as response:
+ async with session.get(url, timeout=timeout) as response:
return 200 <= response.status < 300
except (aiohttp.ClientError, asyncio.TimeoutError):
logger.debug("ping failed", exc_info=True)
return False
+ finally:
+ lease.release()
async def raw_query( # type: ignore[override]
self,
@@ -929,8 +1009,11 @@ class AsyncClient(Client):
headers = dict_copy(headers, transport_settings)
response = await self._raw_request(body, params, headers=headers, files=files, retries=self.query_retries)
- response_data = await response.read()
- encoding = response.headers.get("Content-Encoding")
+ try:
+ response_data = await response.read()
+ encoding = response.headers.get("Content-Encoding")
+ finally:
+ _release_lease(response)
if encoding:
loop = asyncio.get_running_loop()
@@ -961,7 +1044,14 @@ class AsyncClient(Client):
async for chunk in response.content.iter_any():
yield chunk
- return StreamContext(response, byte_iterator())
+ class _RawStreamSource:
+ def close(self):
+ try:
+ response.close()
+ finally:
+ _release_lease(response)
+
+ return StreamContext(_RawStreamSource(), byte_iterator())
def _prep_raw_query(self, query, parameters, settings, fmt, use_database, external_data):
"""
@@ -1578,6 +1668,7 @@ class AsyncClient(Client):
params.update(self._validate_settings(context.settings))
headers = dict_copy(headers, context.transport_settings)
+ response = None
try:
response = await self._raw_request(
active_source.async_generator(),
@@ -1587,6 +1678,7 @@ class AsyncClient(Client):
retry_body=rebuild_body,
)
logger.debug("Context insert response code: %d", response.status)
+ summary = self._summary(response)
except Exception:
await active_source.close()
@@ -1598,8 +1690,11 @@ class AsyncClient(Client):
finally:
await active_source.close()
context.data = None
+ if response is not None:
+ response.close()
+ _release_lease(response)
- return QuerySummary(self._summary(response))
+ return QuerySummary(summary)
async def insert_df( # type: ignore[override]
self,
@@ -1684,8 +1779,12 @@ class AsyncClient(Client):
headers = dict_copy(headers, transport_settings)
response = await self._raw_request(insert_block, params, headers, server_wait=False)
- logger.debug("Raw insert response code: %d", response.status)
- return QuerySummary(self._summary(response))
+ try:
+ logger.debug("Raw insert response code: %d", response.status)
+ return QuerySummary(self._summary(response))
+ finally:
+ response.close()
+ _release_lease(response)
def _add_integration_tag(self, name: str):
"""
@@ -1799,9 +1898,10 @@ class AsyncClient(Client):
if self._last_pool_reset is None:
self._last_pool_reset = now
elif self._last_pool_reset < now - reset_seconds:
+ # Stamp before await so concurrent callers don't all queue redundant resets.
+ self._last_pool_reset = now
logger.debug("connection expiration - resetting connection pool")
await self.close_connections()
- self._last_pool_reset = now
final_params = dict_copy(self._client_settings, params)
if server_wait:
@@ -1830,6 +1930,17 @@ class AsyncClient(Client):
)
self._active_session = query_session
+ # Snapshot+acquire under lock so close_connections() can't pass the
+ # drain check between our session read and our refcount increment.
+ async with self._session_lock:
+ lease = self._session_lease
+ if lease is None or lease.session.closed:
+ if query_session:
+ self._active_session = None
+ raise ProgrammingError("Client session is unavailable; the client may have been closed.")
+ session = lease.session
+ lease.acquire()
+ lease_released = False
try:
# Construct full URL (aiohttp doesn't have base_url)
url = f"{self.url}/"
@@ -1857,8 +1968,11 @@ class AsyncClient(Client):
else:
request_kwargs["data"] = data
- response = await self._session.request(**request_kwargs)
+ response = await session.request(**request_kwargs)
if 200 <= response.status < 300 and not response.headers.get(ex_header):
+ # Caller releases lease after consuming the body.
+ response._lease_release = _one_shot(lease.release)
+ lease_released = True
return response
if response.status in (429, 503, 504):
@@ -1888,6 +2002,8 @@ class AsyncClient(Client):
raise OperationalError(f"Network Error: {str(e)}") from e
finally:
+ if not lease_released:
+ lease.release()
if query_session:
self._active_session = None
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py
index c88af65c345..7c72e570338 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py
@@ -199,7 +199,7 @@ class Client(ABC):
self.server_version, server_tz = tuple(self.command("SELECT version(), timezone()", use_database=False))
try:
server_tz_info = tzutil.resolve_zone(server_tz)
- server_tz_info, self._dst_safe = tzutil.normalize_timezone(server_tz_info)
+ server_tz_info, self._dst_safe = tzutil.normalize_timezone(server_tz_info, trust_fixed_offset=True)
self.server_tz = server_tz_info
except ZoneInfoNotFoundError:
logger.warning(
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py
index ac0e28ad3dd..d135e94194d 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py
@@ -52,6 +52,12 @@ def close_managers():
manager.clear()
+def resolve_ca_cert(ca_cert: str | None) -> str | None:
+ if ca_cert == "certifi":
+ return certifi.where()
+ return ca_cert
+
+
def get_pool_manager_options(
keep_interval: int = DEFAULT_KEEP_INTERVAL,
keep_count: int = DEFAULT_KEEP_COUNT,
@@ -73,8 +79,7 @@ def get_pool_manager_options(
socket_options.append((SOCKET_TCP, getattr(socket, "TCP_KEEPALIVE", 0x10), keep_interval))
options["maxsize"] = options.get("maxsize", 8)
options["retries"] = options.get("retries", 1)
- if ca_cert == "certifi":
- ca_cert = certifi.where()
+ ca_cert = resolve_ca_cert(ca_cert)
options["cert_reqs"] = "CERT_REQUIRED" if verify else "CERT_NONE"
if ca_cert:
options["ca_certs"] = ca_cert
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/streaming.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/streaming.py
index c246c7e3b2a..0a20bfb7697 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/streaming.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/streaming.py
@@ -46,6 +46,11 @@ class StreamingResponseSource(Closable):
self._producer_error: Exception | None = None
self._producer_completed = False
+ def _release_lease(self):
+ release = getattr(self.response, "_lease_release", None)
+ if release is not None:
+ release()
+
async def start_producer(self, loop: asyncio.AbstractEventLoop):
"""Start the async producer task.
Must be called from the event loop thread before consuming.
@@ -78,6 +83,7 @@ class StreamingResponseSource(Closable):
finally:
self.queue.shutdown()
+ self._release_lease()
self._producer_task = loop.create_task(producer())
self._producer_started.set()
@@ -179,6 +185,7 @@ class StreamingResponseSource(Closable):
if not self._producer_completed:
self.response.close()
await asyncio.sleep(0.05)
+ self._release_lease()
def close(self):
"""Synchronous cleanup resources"""
@@ -190,6 +197,7 @@ class StreamingResponseSource(Closable):
if self.response and not self.response.closed:
if not self._producer_completed:
self.response.close()
+ self._release_lease()
class StreamingFileAdapter:
diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/tzutil.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tzutil.py
index 612620a07db..7eb330da8bb 100644
--- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/tzutil.py
+++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tzutil.py
@@ -1,6 +1,7 @@
import os
+import re
import zoneinfo
-from datetime import datetime, timezone, tzinfo
+from datetime import datetime, timedelta, timezone, tzinfo
tzlocal = None
try:
@@ -44,17 +45,37 @@ UTC_EQUIVALENTS = (
# extra to get the IANA zone data.
TZDATA_HINT = "install the tzdata package (e.g. `pip install clickhouse-connect[tzdata]`) if no system zoneinfo database is available"
+# ClickHouse servers without an IANA tz database report Fixed/UTC+HH:MM:SS
+# or Fixed/UTC-HH:MM:SS for any non-UTC
+# timezone (in column types, X-ClickHouse-Timezone, and SELECT timezone()). Hours, minutes,
+# and seconds are always zero-padded to two digits; the server rejects single-digit forms
+# like `Fixed/UTC+5:30:00`. Range validation is done in resolve_zone() rather than in the
+# regex because the server also accepts the boundary value +/-24:00:00, which Python's
+# datetime.timezone cannot represent as a non-UTC offset and must not be silently collapsed to UTC.
+_FIXED_TZ_RE = re.compile(r"^Fixed/UTC([+-])(\d{2}):(\d{2}):(\d{2})$")
+
def resolve_zone(tz_name: str) -> tzinfo:
"""Resolve an IANA timezone name to a tzinfo.
Short-circuits UTC-equivalent names to datetime.timezone.utc so that representing UTC
- does not require an IANA zoneinfo database to be available on the host. Other names are
- resolved via zoneinfo.ZoneInfo and will raise ZoneInfoNotFoundError if the host has
- no system zoneinfo and the tzdata package is not installed.
+ does not require an IANA zoneinfo database to be available on the host. Also recognizes
+ ClickHouse's Fixed/UTC+HH:MM:SS and Fixed/UTC-HH:MM:SS offset format
+ (emitted by servers without IANA tz data) and returns a stdlib datetime.timezone.
+ Other names are resolved via zoneinfo.ZoneInfo and will raise ZoneInfoNotFoundError
+ if the host has no system zoneinfo and the tzdata package is not installed.
"""
if tz_name in UTC_EQUIVALENTS:
return timezone.utc
+ fixed = _FIXED_TZ_RE.match(tz_name)
+ if fixed:
+ sign, hh, mm, ss = fixed.groups()
+ h, m, s = int(hh), int(mm), int(ss)
+ if h < 24 and m < 60 and s < 60:
+ offset = timedelta(hours=h, minutes=m, seconds=s)
+ if sign == "-":
+ offset = -offset
+ return timezone.utc if offset == timedelta(0) else timezone(offset)
try:
return zoneinfo.ZoneInfo(tz_name)
except ValueError as ex:
@@ -63,11 +84,23 @@ def resolve_zone(tz_name: str) -> tzinfo:
raise zoneinfo.ZoneInfoNotFoundError(str(ex)) from ex
-def normalize_timezone(tz: tzinfo) -> tuple[tzinfo, bool]:
+def normalize_timezone(tz: tzinfo, trust_fixed_offset: bool = False) -> tuple[tzinfo, bool]:
+ # Server-init paths pass trust_fixed_offset=True for tzs derived from a ClickHouse-reported
+ # Fixed/UTC+HH:MM:SS or Fixed/UTC-HH:MM:SS string. Those are self-describing and
+ # DST-safe by definition, but their tzname(None) (e.g. "UTC+05:30") is not an IANA
+ # key and would otherwise fall through to the unsafe branch, silently dropping the
+ # server tz under tz_source="auto".
+ #
+ # The local-init path (bottom of this module) deliberately does NOT set this flag because
+ # a stdlib datetime.timezone returned from datetime.now().astimezone().tzinfo is the current
+ # local offset (e.g. PDT), and we want the tzlocal-recovery branch below to upgrade it to
+ # a real IANA zone so the local time tracks across DST.
+ if trust_fixed_offset and isinstance(tz, timezone):
+ return tz, True
+
# ZoneInfo exposes the IANA key on `.key`; fall back to tzname(None) for other tzinfo
- # subclasses (datetime.timezone, fixed offsets). pytz used to return the IANA name from
- # tzname(None), but ZoneInfo returns None, which would collapse every named zone into the
- # "unsafe" fallback branch.
+ # subclasses. pytz used to return the IANA name from tzname(None), but ZoneInfo returns
+ # None, which would collapse every named zone into the "unsafe" fallback branch.
tz_key = getattr(tz, "key", None) or tz.tzname(None)
if tz_key in UTC_EQUIVALENTS:
diff --git a/contrib/python/clickhouse-connect/ya.make b/contrib/python/clickhouse-connect/ya.make
index f8f72d103b4..5a14deb8876 100644
--- a/contrib/python/clickhouse-connect/ya.make
+++ b/contrib/python/clickhouse-connect/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(1.0.0)
+VERSION(1.0.1)
LICENSE(Apache-2.0)