diff options
| author | robot-piglet <[email protected]> | 2026-06-03 11:02:34 +0300 |
|---|---|---|
| committer | robot-piglet <[email protected]> | 2026-06-03 11:26:06 +0300 |
| commit | 23e2a014ee64590656fe587d35005c5aac02ccd8 (patch) | |
| tree | 5c7bc29ac1266549dcf83076c5748646ece582da /contrib/python/clickhouse-connect | |
| parent | 37780c25523e7295b8b4887fb28fbb05fbb8e37d (diff) | |
Intermediate changes
commit_hash:f088ee5140fe7cfb904b3d2eef65c385a73148fb
Diffstat (limited to 'contrib/python/clickhouse-connect')
9 files changed, 231 insertions, 45 deletions
diff --git a/contrib/python/clickhouse-connect/.dist-info/METADATA b/contrib/python/clickhouse-connect/.dist-info/METADATA index 4a4168b9536..3f24e9dbb73 100644 --- a/contrib/python/clickhouse-connect/.dist-info/METADATA +++ b/contrib/python/clickhouse-connect/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.4 Name: clickhouse-connect -Version: 1.0.0 +Version: 1.0.1 Summary: ClickHouse Database Core Driver for Python, Pandas, and Superset Home-page: https://github.com/ClickHouse/clickhouse-connect Author: ClickHouse Inc. diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/_version.py b/contrib/python/clickhouse-connect/clickhouse_connect/_version.py index 11a716ec1fe..86774adcc16 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/_version.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/_version.py @@ -1 +1 @@ -version = "1.0.0" +version = "1.0.1" diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/compiler.py b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/compiler.py index 9443cee553e..bbe7a4e9640 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/compiler.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/sql/compiler.py @@ -40,6 +40,10 @@ def _resolve_ch_type_name(sqla_type): class ChStatementCompiler(SQLCompiler): + def _raise_on_escape(self, binary, operator_name: str): + if binary.modifiers.get("escape") is not None: + raise CompileError(f"ClickHouse does not support the ESCAPE clause on {operator_name}") + def visit_delete(self, delete_stmt, visiting_cte=None, **kw): table = delete_stmt.table text = f"DELETE FROM {format_table(table)}" @@ -261,3 +265,23 @@ class ChStatementCompiler(SQLCompiler): if mods and alias in mods: result += " " + mods[alias] return result + + def visit_like_op_binary(self, binary, operator, **kw): + self._raise_on_escape(binary, "LIKE") + return super().visit_like_op_binary(binary, operator, **kw) + + def visit_not_like_op_binary(self, binary, operator, **kw): + self._raise_on_escape(binary, "LIKE") + return super().visit_not_like_op_binary(binary, operator, **kw) + + def visit_ilike_op_binary(self, binary, operator, **kw): + self._raise_on_escape(binary, "ILIKE") + left = self.process(binary.left, **kw) + right = self.process(binary.right, **kw) + return f"{left} ILIKE {right}" + + def visit_not_ilike_op_binary(self, binary, operator, **kw): + self._raise_on_escape(binary, "ILIKE") + left = self.process(binary.left, **kw) + right = self.process(binary.right, **kw) + return f"{left} NOT ILIKE {right}" diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py index 7e8c80187e7..973018cc521 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py @@ -98,6 +98,53 @@ class BytesSource: """No-op close method for compatibility.""" +class _SessionLease: + """An aiohttp.ClientSession with an in-flight request count, so close() + can wait for outstanding requests to drain before tearing down the session.""" + + __slots__ = ("session", "_inflight", "_drained") + + def __init__(self, session: aiohttp.ClientSession): + self.session = session + self._inflight = 0 + self._drained = asyncio.Event() + self._drained.set() + + def acquire(self) -> None: + self._inflight += 1 + if self._inflight == 1: + self._drained.clear() + + def release(self) -> None: + self._inflight -= 1 + if self._inflight == 0: + self._drained.set() + + async def wait_drained(self) -> None: + await self._drained.wait() + + +def _one_shot(fn: Callable[[], None]) -> Callable[[], None]: + """Returns a wrapper that invokes fn at most once.""" + fired = False + + def call(): + nonlocal fired + if not fired: + fired = True + fn() + + return call + + +def _release_lease(response: aiohttp.ClientResponse | None) -> None: + if response is None: + return + release = getattr(response, "_lease_release", None) + if release is not None: + release() + + class AsyncClient(Client): valid_transport_settings = { "database", @@ -231,7 +278,7 @@ class AsyncClient(Client): ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE elif ca_cert: - ssl_context.load_verify_locations(ca_cert) + ssl_context.load_verify_locations(httputil.resolve_ca_cert(ca_cert)) if client_cert: ssl_context.load_cert_chain(client_cert, client_cert_key) @@ -250,7 +297,8 @@ class AsyncClient(Client): if sys.version_info < (3, 12, 7) or sys.version_info[:3] == (3, 13, 0): self._connector_kwargs["enable_cleanup_closed"] = True - self._session = None + self._session_lease: _SessionLease | None = None + self._session_lock = asyncio.Lock() self._read_format = "Native" self._write_format = "Native" self._transform = NativeTransform() @@ -284,6 +332,15 @@ class AsyncClient(Client): autoconnect=False, ) + @property + def _session(self) -> aiohttp.ClientSession | None: + lease = self._session_lease + return lease.session if lease is not None else None + + @_session.setter + def _session(self, value: aiohttp.ClientSession | None) -> None: + self._session_lease = _SessionLease(value) if value is not None else None + async def _initialize(self): """ Async equivalent of Client._init_common_settings. @@ -311,7 +368,7 @@ class AsyncClient(Client): self.server_version, server_tz_str = tuple(row) try: server_tz = tzutil.resolve_zone(server_tz_str) - server_tz, self._dst_safe = tzutil.normalize_timezone(server_tz) + server_tz, self._dst_safe = tzutil.normalize_timezone(server_tz, trust_fixed_offset=True) self.server_tz = server_tz except zoneinfo.ZoneInfoNotFoundError: logger.warning( @@ -420,22 +477,31 @@ class AsyncClient(Client): return False async def close(self): # type: ignore[override] - if self._session: - await self._session.close() + async with self._session_lock: + old_lease = self._session_lease + self._session_lease = None + if old_lease is not None: + await old_lease.wait_drained() + await old_lease.session.close() async def close_connections(self): # type: ignore[override] - """Close all pooled connections and recreate session""" - if self._session: - await self._session.close() - connector = aiohttp.TCPConnector(**self._connector_kwargs) - self._session = aiohttp.ClientSession( - connector=connector, - timeout=self._timeout, - headers=self.headers, - trust_env=False, - auto_decompress=False, - skip_auto_headers={"Accept-Encoding"}, - ) + """Rotate the connection pool: new requests use a fresh session; in-flight + requests keep using the old session until they complete, then it's closed.""" + async with self._session_lock: + old_lease = self._session_lease + connector = aiohttp.TCPConnector(**self._connector_kwargs) + new_session = aiohttp.ClientSession( + connector=connector, + timeout=self._timeout, + headers=self.headers, + trust_env=False, + auto_decompress=False, + skip_auto_headers={"Accept-Encoding"}, + ) + self._session_lease = _SessionLease(new_session) + if old_lease is not None: + await old_lease.wait_drained() + await old_lease.session.close() def set_client_setting(self, key, value): str_value = self._validate_setting(key, value, common.get_setting("invalid_setting_action")) @@ -496,8 +562,11 @@ class AsyncClient(Client): params.update(context.bind_params) response = await self._raw_request(fmt_json_query, params, headers, retries=self.query_retries) - body = await response.read() - encoding = response.headers.get("Content-Encoding") + try: + body = await response.read() + encoding = response.headers.get("Content-Encoding") + finally: + _release_lease(response) loop = asyncio.get_running_loop() def decompress_and_parse_json(): @@ -874,9 +943,12 @@ class AsyncClient(Client): headers = dict_copy(headers, transport_settings) method = "POST" if payload or files else "GET" response = await self._raw_request(payload, params, headers, files=files, method=method, server_wait=False) - body = await response.read() - encoding = response.headers.get("Content-Encoding") - summary = self._summary(response) + try: + body = await response.read() + encoding = response.headers.get("Content-Encoding") + summary = self._summary(response) + finally: + _release_lease(response) if not body: return QuerySummary(summary) @@ -902,14 +974,22 @@ class AsyncClient(Client): return await loop.run_in_executor(None, decompress_and_decode) async def ping(self) -> bool: # type: ignore[override] + async with self._session_lock: + lease = self._session_lease + if lease is None or lease.session.closed: + return False + session = lease.session + lease.acquire() try: url = f"{self.url}/ping" timeout = aiohttp.ClientTimeout(total=3.0) - async with self._session.get(url, timeout=timeout) as response: + async with session.get(url, timeout=timeout) as response: return 200 <= response.status < 300 except (aiohttp.ClientError, asyncio.TimeoutError): logger.debug("ping failed", exc_info=True) return False + finally: + lease.release() async def raw_query( # type: ignore[override] self, @@ -929,8 +1009,11 @@ class AsyncClient(Client): headers = dict_copy(headers, transport_settings) response = await self._raw_request(body, params, headers=headers, files=files, retries=self.query_retries) - response_data = await response.read() - encoding = response.headers.get("Content-Encoding") + try: + response_data = await response.read() + encoding = response.headers.get("Content-Encoding") + finally: + _release_lease(response) if encoding: loop = asyncio.get_running_loop() @@ -961,7 +1044,14 @@ class AsyncClient(Client): async for chunk in response.content.iter_any(): yield chunk - return StreamContext(response, byte_iterator()) + class _RawStreamSource: + def close(self): + try: + response.close() + finally: + _release_lease(response) + + return StreamContext(_RawStreamSource(), byte_iterator()) def _prep_raw_query(self, query, parameters, settings, fmt, use_database, external_data): """ @@ -1578,6 +1668,7 @@ class AsyncClient(Client): params.update(self._validate_settings(context.settings)) headers = dict_copy(headers, context.transport_settings) + response = None try: response = await self._raw_request( active_source.async_generator(), @@ -1587,6 +1678,7 @@ class AsyncClient(Client): retry_body=rebuild_body, ) logger.debug("Context insert response code: %d", response.status) + summary = self._summary(response) except Exception: await active_source.close() @@ -1598,8 +1690,11 @@ class AsyncClient(Client): finally: await active_source.close() context.data = None + if response is not None: + response.close() + _release_lease(response) - return QuerySummary(self._summary(response)) + return QuerySummary(summary) async def insert_df( # type: ignore[override] self, @@ -1684,8 +1779,12 @@ class AsyncClient(Client): headers = dict_copy(headers, transport_settings) response = await self._raw_request(insert_block, params, headers, server_wait=False) - logger.debug("Raw insert response code: %d", response.status) - return QuerySummary(self._summary(response)) + try: + logger.debug("Raw insert response code: %d", response.status) + return QuerySummary(self._summary(response)) + finally: + response.close() + _release_lease(response) def _add_integration_tag(self, name: str): """ @@ -1799,9 +1898,10 @@ class AsyncClient(Client): if self._last_pool_reset is None: self._last_pool_reset = now elif self._last_pool_reset < now - reset_seconds: + # Stamp before await so concurrent callers don't all queue redundant resets. + self._last_pool_reset = now logger.debug("connection expiration - resetting connection pool") await self.close_connections() - self._last_pool_reset = now final_params = dict_copy(self._client_settings, params) if server_wait: @@ -1830,6 +1930,17 @@ class AsyncClient(Client): ) self._active_session = query_session + # Snapshot+acquire under lock so close_connections() can't pass the + # drain check between our session read and our refcount increment. + async with self._session_lock: + lease = self._session_lease + if lease is None or lease.session.closed: + if query_session: + self._active_session = None + raise ProgrammingError("Client session is unavailable; the client may have been closed.") + session = lease.session + lease.acquire() + lease_released = False try: # Construct full URL (aiohttp doesn't have base_url) url = f"{self.url}/" @@ -1857,8 +1968,11 @@ class AsyncClient(Client): else: request_kwargs["data"] = data - response = await self._session.request(**request_kwargs) + response = await session.request(**request_kwargs) if 200 <= response.status < 300 and not response.headers.get(ex_header): + # Caller releases lease after consuming the body. + response._lease_release = _one_shot(lease.release) + lease_released = True return response if response.status in (429, 503, 504): @@ -1888,6 +2002,8 @@ class AsyncClient(Client): raise OperationalError(f"Network Error: {str(e)}") from e finally: + if not lease_released: + lease.release() if query_session: self._active_session = None diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py index c88af65c345..7c72e570338 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/client.py @@ -199,7 +199,7 @@ class Client(ABC): self.server_version, server_tz = tuple(self.command("SELECT version(), timezone()", use_database=False)) try: server_tz_info = tzutil.resolve_zone(server_tz) - server_tz_info, self._dst_safe = tzutil.normalize_timezone(server_tz_info) + server_tz_info, self._dst_safe = tzutil.normalize_timezone(server_tz_info, trust_fixed_offset=True) self.server_tz = server_tz_info except ZoneInfoNotFoundError: logger.warning( diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py index ac0e28ad3dd..d135e94194d 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httputil.py @@ -52,6 +52,12 @@ def close_managers(): manager.clear() +def resolve_ca_cert(ca_cert: str | None) -> str | None: + if ca_cert == "certifi": + return certifi.where() + return ca_cert + + def get_pool_manager_options( keep_interval: int = DEFAULT_KEEP_INTERVAL, keep_count: int = DEFAULT_KEEP_COUNT, @@ -73,8 +79,7 @@ def get_pool_manager_options( socket_options.append((SOCKET_TCP, getattr(socket, "TCP_KEEPALIVE", 0x10), keep_interval)) options["maxsize"] = options.get("maxsize", 8) options["retries"] = options.get("retries", 1) - if ca_cert == "certifi": - ca_cert = certifi.where() + ca_cert = resolve_ca_cert(ca_cert) options["cert_reqs"] = "CERT_REQUIRED" if verify else "CERT_NONE" if ca_cert: options["ca_certs"] = ca_cert diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/streaming.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/streaming.py index c246c7e3b2a..0a20bfb7697 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/streaming.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/streaming.py @@ -46,6 +46,11 @@ class StreamingResponseSource(Closable): self._producer_error: Exception | None = None self._producer_completed = False + def _release_lease(self): + release = getattr(self.response, "_lease_release", None) + if release is not None: + release() + async def start_producer(self, loop: asyncio.AbstractEventLoop): """Start the async producer task. Must be called from the event loop thread before consuming. @@ -78,6 +83,7 @@ class StreamingResponseSource(Closable): finally: self.queue.shutdown() + self._release_lease() self._producer_task = loop.create_task(producer()) self._producer_started.set() @@ -179,6 +185,7 @@ class StreamingResponseSource(Closable): if not self._producer_completed: self.response.close() await asyncio.sleep(0.05) + self._release_lease() def close(self): """Synchronous cleanup resources""" @@ -190,6 +197,7 @@ class StreamingResponseSource(Closable): if self.response and not self.response.closed: if not self._producer_completed: self.response.close() + self._release_lease() class StreamingFileAdapter: diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/tzutil.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tzutil.py index 612620a07db..7eb330da8bb 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/tzutil.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/tzutil.py @@ -1,6 +1,7 @@ import os +import re import zoneinfo -from datetime import datetime, timezone, tzinfo +from datetime import datetime, timedelta, timezone, tzinfo tzlocal = None try: @@ -44,17 +45,37 @@ UTC_EQUIVALENTS = ( # extra to get the IANA zone data. TZDATA_HINT = "install the tzdata package (e.g. `pip install clickhouse-connect[tzdata]`) if no system zoneinfo database is available" +# ClickHouse servers without an IANA tz database report Fixed/UTC+HH:MM:SS +# or Fixed/UTC-HH:MM:SS for any non-UTC +# timezone (in column types, X-ClickHouse-Timezone, and SELECT timezone()). Hours, minutes, +# and seconds are always zero-padded to two digits; the server rejects single-digit forms +# like `Fixed/UTC+5:30:00`. Range validation is done in resolve_zone() rather than in the +# regex because the server also accepts the boundary value +/-24:00:00, which Python's +# datetime.timezone cannot represent as a non-UTC offset and must not be silently collapsed to UTC. +_FIXED_TZ_RE = re.compile(r"^Fixed/UTC([+-])(\d{2}):(\d{2}):(\d{2})$") + def resolve_zone(tz_name: str) -> tzinfo: """Resolve an IANA timezone name to a tzinfo. Short-circuits UTC-equivalent names to datetime.timezone.utc so that representing UTC - does not require an IANA zoneinfo database to be available on the host. Other names are - resolved via zoneinfo.ZoneInfo and will raise ZoneInfoNotFoundError if the host has - no system zoneinfo and the tzdata package is not installed. + does not require an IANA zoneinfo database to be available on the host. Also recognizes + ClickHouse's Fixed/UTC+HH:MM:SS and Fixed/UTC-HH:MM:SS offset format + (emitted by servers without IANA tz data) and returns a stdlib datetime.timezone. + Other names are resolved via zoneinfo.ZoneInfo and will raise ZoneInfoNotFoundError + if the host has no system zoneinfo and the tzdata package is not installed. """ if tz_name in UTC_EQUIVALENTS: return timezone.utc + fixed = _FIXED_TZ_RE.match(tz_name) + if fixed: + sign, hh, mm, ss = fixed.groups() + h, m, s = int(hh), int(mm), int(ss) + if h < 24 and m < 60 and s < 60: + offset = timedelta(hours=h, minutes=m, seconds=s) + if sign == "-": + offset = -offset + return timezone.utc if offset == timedelta(0) else timezone(offset) try: return zoneinfo.ZoneInfo(tz_name) except ValueError as ex: @@ -63,11 +84,23 @@ def resolve_zone(tz_name: str) -> tzinfo: raise zoneinfo.ZoneInfoNotFoundError(str(ex)) from ex -def normalize_timezone(tz: tzinfo) -> tuple[tzinfo, bool]: +def normalize_timezone(tz: tzinfo, trust_fixed_offset: bool = False) -> tuple[tzinfo, bool]: + # Server-init paths pass trust_fixed_offset=True for tzs derived from a ClickHouse-reported + # Fixed/UTC+HH:MM:SS or Fixed/UTC-HH:MM:SS string. Those are self-describing and + # DST-safe by definition, but their tzname(None) (e.g. "UTC+05:30") is not an IANA + # key and would otherwise fall through to the unsafe branch, silently dropping the + # server tz under tz_source="auto". + # + # The local-init path (bottom of this module) deliberately does NOT set this flag because + # a stdlib datetime.timezone returned from datetime.now().astimezone().tzinfo is the current + # local offset (e.g. PDT), and we want the tzlocal-recovery branch below to upgrade it to + # a real IANA zone so the local time tracks across DST. + if trust_fixed_offset and isinstance(tz, timezone): + return tz, True + # ZoneInfo exposes the IANA key on `.key`; fall back to tzname(None) for other tzinfo - # subclasses (datetime.timezone, fixed offsets). pytz used to return the IANA name from - # tzname(None), but ZoneInfo returns None, which would collapse every named zone into the - # "unsafe" fallback branch. + # subclasses. pytz used to return the IANA name from tzname(None), but ZoneInfo returns + # None, which would collapse every named zone into the "unsafe" fallback branch. tz_key = getattr(tz, "key", None) or tz.tzname(None) if tz_key in UTC_EQUIVALENTS: diff --git a/contrib/python/clickhouse-connect/ya.make b/contrib/python/clickhouse-connect/ya.make index f8f72d103b4..5a14deb8876 100644 --- a/contrib/python/clickhouse-connect/ya.make +++ b/contrib/python/clickhouse-connect/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(1.0.0) +VERSION(1.0.1) LICENSE(Apache-2.0) |
