diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-07-23 07:47:34 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-07-23 07:56:54 +0300 |
commit | e9497f65d8bbe478a583fa31b74e03a8d342ff5f (patch) | |
tree | 48793f79ab80ae9a82e77744ed89f82aeee8e59a | |
parent | 973312112362cf03ce0f7d4e5fdbcb63d1166670 (diff) | |
download | ydb-e9497f65d8bbe478a583fa31b74e03a8d342ff5f.tar.gz |
Intermediate changes
16 files changed, 1248 insertions, 104 deletions
diff --git a/contrib/python/clickhouse-connect/.dist-info/METADATA b/contrib/python/clickhouse-connect/.dist-info/METADATA index dac0751d94..7a4ae1cbbf 100644 --- a/contrib/python/clickhouse-connect/.dist-info/METADATA +++ b/contrib/python/clickhouse-connect/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: clickhouse-connect -Version: 0.7.15 +Version: 0.7.16 Summary: ClickHouse Database Core Driver for Python, Pandas, and Superset Home-page: https://github.com/ClickHouse/clickhouse-connect Author: ClickHouse Inc. @@ -39,14 +39,14 @@ Requires-Dist: tzlocal >=4.0 ; extra == 'tzlocal' ## ClickHouse Connect A high performance core database driver for connecting ClickHouse to Python, Pandas, and Superset + * Pandas DataFrames * Numpy Arrays * PyArrow Tables * Superset Connector * SQLAlchemy 1.3 and 1.4 (limited feature set) -ClickHouse Connect currently uses the ClickHouse HTTP interface for maximum compatibility. - +ClickHouse Connect currently uses the ClickHouse HTTP interface for maximum compatibility. ### Installation @@ -54,28 +54,30 @@ ClickHouse Connect currently uses the ClickHouse HTTP interface for maximum comp pip install clickhouse-connect ``` -ClickHouse Connect requires Python 3.8 or higher. - +ClickHouse Connect requires Python 3.8 or higher. ### Superset Connectivity -ClickHouse Connect is fully integrated with Apache Superset. Previous versions of ClickHouse Connect utilized a + +ClickHouse Connect is fully integrated with Apache Superset. Previous versions of ClickHouse Connect utilized a dynamically loaded Superset Engine Spec, but as of Superset v2.1.0 the engine spec was incorporated into the main -Apache Superset project and removed from clickhouse-connect in v0.6.0. If you have issues connecting to earlier +Apache Superset project and removed from clickhouse-connect in v0.6.0. If you have issues connecting to earlier versions of Superset, please use clickhouse-connect v0.5.25. When creating a Superset Data Source, either use the provided connection dialog, or a SqlAlchemy DSN in the form `clickhousedb://{username}:{password}@{host}:{port}`. - ### SQLAlchemy Implementation + ClickHouse Connect incorporates a minimal SQLAlchemy implementation (without any ORM features) for compatibility with -Superset. It has only been tested against SQLAlchemy versions 1.3.x and 1.4.x, and is unlikely to work with more +Superset. It has only been tested against SQLAlchemy versions 1.3.x and 1.4.x, and is unlikely to work with more complex SQLAlchemy applications. +### Asyncio Support + +ClickHouse Connect provides an async wrapper, so that it is possible to use the client in an `asyncio` environment. +See the [run_async example](./examples/run_async.py) for more details. ### Complete Documentation + The documentation for ClickHouse Connect has moved to [ClickHouse Docs](https://clickhouse.com/docs/en/integrations/language-clients/python/intro) - - - diff --git a/contrib/python/clickhouse-connect/README.md b/contrib/python/clickhouse-connect/README.md index 59f419e3f2..58defa499d 100644 --- a/contrib/python/clickhouse-connect/README.md +++ b/contrib/python/clickhouse-connect/README.md @@ -1,14 +1,14 @@ ## ClickHouse Connect A high performance core database driver for connecting ClickHouse to Python, Pandas, and Superset + * Pandas DataFrames * Numpy Arrays * PyArrow Tables * Superset Connector * SQLAlchemy 1.3 and 1.4 (limited feature set) -ClickHouse Connect currently uses the ClickHouse HTTP interface for maximum compatibility. - +ClickHouse Connect currently uses the ClickHouse HTTP interface for maximum compatibility. ### Installation @@ -16,28 +16,30 @@ ClickHouse Connect currently uses the ClickHouse HTTP interface for maximum comp pip install clickhouse-connect ``` -ClickHouse Connect requires Python 3.8 or higher. - +ClickHouse Connect requires Python 3.8 or higher. ### Superset Connectivity -ClickHouse Connect is fully integrated with Apache Superset. Previous versions of ClickHouse Connect utilized a + +ClickHouse Connect is fully integrated with Apache Superset. Previous versions of ClickHouse Connect utilized a dynamically loaded Superset Engine Spec, but as of Superset v2.1.0 the engine spec was incorporated into the main -Apache Superset project and removed from clickhouse-connect in v0.6.0. If you have issues connecting to earlier +Apache Superset project and removed from clickhouse-connect in v0.6.0. If you have issues connecting to earlier versions of Superset, please use clickhouse-connect v0.5.25. When creating a Superset Data Source, either use the provided connection dialog, or a SqlAlchemy DSN in the form `clickhousedb://{username}:{password}@{host}:{port}`. - ### SQLAlchemy Implementation + ClickHouse Connect incorporates a minimal SQLAlchemy implementation (without any ORM features) for compatibility with -Superset. It has only been tested against SQLAlchemy versions 1.3.x and 1.4.x, and is unlikely to work with more +Superset. It has only been tested against SQLAlchemy versions 1.3.x and 1.4.x, and is unlikely to work with more complex SQLAlchemy applications. +### Asyncio Support + +ClickHouse Connect provides an async wrapper, so that it is possible to use the client in an `asyncio` environment. +See the [run_async example](./examples/run_async.py) for more details. ### Complete Documentation + The documentation for ClickHouse Connect has moved to [ClickHouse Docs](https://clickhouse.com/docs/en/integrations/language-clients/python/intro) - - - diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py index 81ee6dd3a0..737aed5b34 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/__init__.py @@ -1,6 +1,6 @@ -from clickhouse_connect.driver import create_client - +from clickhouse_connect.driver import create_client, create_async_client driver_name = 'clickhousedb' get_client = create_client +get_async_client = create_async_client diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py index 8dec87cb67..d9a123ac6c 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/__version__.py @@ -1 +1 @@ -version = '0.7.15' +version = '0.7.16' diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py index 2f0b11b132..27e16733d8 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/__init__.py @@ -1,3 +1,4 @@ +import asyncio from inspect import signature from typing import Optional, Union, Dict, Any from urllib.parse import urlparse, parse_qs @@ -7,6 +8,7 @@ from clickhouse_connect.driver.client import Client from clickhouse_connect.driver.common import dict_copy from clickhouse_connect.driver.exceptions import ProgrammingError from clickhouse_connect.driver.httpclient import HttpClient +from clickhouse_connect.driver.asyncclient import AsyncClient # pylint: disable=too-many-arguments,too-many-locals,too-many-branches @@ -68,6 +70,7 @@ def create_client(*, :param server_host_name This is the server host name that will be checked against a TLS certificate for validity. This option can be used if using an ssh_tunnel or other indirect means to an ClickHouse server where the `host` argument refers to the tunnel or proxy and not the actual ClickHouse server + :param autogenerate_session_id If set, this will override the 'autogenerate_session_id' common setting. :return: ClickHouse Connect Client instance """ if dsn: @@ -117,3 +120,78 @@ def default_port(interface: str, secure: bool): if interface.startswith('http'): return 8443 if secure else 8123 raise ValueError('Unrecognized ClickHouse interface') + + +async def create_async_client(*, + host: str = None, + username: str = None, + password: str = '', + database: str = '__default__', + interface: Optional[str] = None, + port: int = 0, + secure: Union[bool, str] = False, + dsn: Optional[str] = None, + settings: Optional[Dict[str, Any]] = None, + generic_args: Optional[Dict[str, Any]] = None, + **kwargs) -> AsyncClient: + """ + The preferred method to get an async ClickHouse Connect Client instance. + For sync version, see create_client. + + Unlike sync version, the 'autogenerate_session_id' setting by default is False. + + :param host: The hostname or IP address of the ClickHouse server. If not set, localhost will be used. + :param username: The ClickHouse username. If not set, the default ClickHouse user will be used. + :param password: The password for username. + :param database: The default database for the connection. If not set, ClickHouse Connect will use the + default database for username. + :param interface: Must be http or https. Defaults to http, or to https if port is set to 8443 or 443 + :param port: The ClickHouse HTTP or HTTPS port. If not set will default to 8123, or to 8443 if secure=True + or interface=https. + :param secure: Use https/TLS. This overrides inferred values from the interface or port arguments. + :param dsn: A string in standard DSN (Data Source Name) format. Other connection values (such as host or user) + will be extracted from this string if not set otherwise. + :param settings: ClickHouse server settings to be used with the session/every request + :param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings. + It is not recommended to use this parameter externally + :param kwargs -- Recognized keyword arguments (used by the HTTP client), see below + + :param compress: Enable compression for ClickHouse HTTP inserts and query results. True will select the preferred + compression method (lz4). A str of 'lz4', 'zstd', 'brotli', or 'gzip' can be used to use a specific compression type + :param query_limit: Default LIMIT on returned rows. 0 means no limit + :param connect_timeout: Timeout in seconds for the http connection + :param send_receive_timeout: Read timeout in seconds for http connection + :param client_name: client_name prepended to the HTTP User Agent header. Set this to track client queries + in the ClickHouse system.query_log. + :param send_progress: Deprecated, has no effect. Previous functionality is now automatically determined + :param verify: Verify the server certificate in secure/https mode + :param ca_cert: If verify is True, the file path to Certificate Authority root to validate ClickHouse server + certificate, in .pem format. Ignored if verify is False. This is not necessary if the ClickHouse server + certificate is trusted by the operating system. To trust the maintained list of "global" public root + certificates maintained by the Python 'certifi' package, set ca_cert to 'certifi' + :param client_cert: File path to a TLS Client certificate in .pem format. This file should contain any + applicable intermediate certificates + :param client_cert_key: File path to the private key for the Client Certificate. Required if the private key + is not included the Client Certificate key file + :param session_id ClickHouse session id. If not specified and the common setting 'autogenerate_session_id' + is True, the client will generate a UUID1 session id + :param pool_mgr Optional urllib3 PoolManager for this client. Useful for creating separate connection + pools for multiple client endpoints for applications with many clients + :param http_proxy http proxy address. Equivalent to setting the HTTP_PROXY environment variable + :param https_proxy https proxy address. Equivalent to setting the HTTPS_PROXY environment variable + :param server_host_name This is the server host name that will be checked against a TLS certificate for + validity. This option can be used if using an ssh_tunnel or other indirect means to an ClickHouse server + where the `host` argument refers to the tunnel or proxy and not the actual ClickHouse server + :param autogenerate_session_id If set, this will override the 'autogenerate_session_id' common setting. + :return: ClickHouse Connect Client instance + """ + + def _create_client(): + if 'autogenerate_session_id' not in kwargs: + kwargs['autogenerate_session_id'] = False + return create_client(host=host, username=username, password=password, database=database, interface=interface, + port=port, secure=secure, dsn=dsn, settings=settings, generic_args=generic_args, **kwargs) + + loop = asyncio.get_running_loop() + _client = await loop.run_in_executor(None, _create_client) + return AsyncClient(client=_client) diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py new file mode 100644 index 0000000000..1a4ee7e429 --- /dev/null +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/asyncclient.py @@ -0,0 +1,663 @@ +import asyncio +import io +from datetime import tzinfo +from typing import Optional, Union, Dict, Any, Sequence, Iterable, Generator, BinaryIO + +from clickhouse_connect.driver.client import Client +from clickhouse_connect.driver.common import StreamContext +from clickhouse_connect.driver.external import ExternalData +from clickhouse_connect.driver.query import QueryContext, QueryResult +from clickhouse_connect.driver.summary import QuerySummary +from clickhouse_connect.datatypes.base import ClickHouseType +from clickhouse_connect.driver.insert import InsertContext + + +# pylint: disable=too-many-public-methods, too-many-instance-attributes, too-many-arguments, too-many-locals +class AsyncClient: + """ + AsyncClient is a wrapper around the ClickHouse Client object that allows for async calls to the ClickHouse server. + Internally, each of the methods that uses IO is wrapped in a call to EventLoop.run_in_executor. + """ + + def __init__(self, *, client: Client): + self.client = client + + def set_client_setting(self, key, value): + """ + Set a clickhouse setting for the client after initialization. If a setting is not recognized by ClickHouse, + or the setting is identified as "read_only", this call will either throw a Programming exception or attempt + to send the setting anyway based on the common setting 'invalid_setting_action'. + :param key: ClickHouse setting name + :param value: ClickHouse setting value + """ + self.client.set_client_setting(key=key, value=value) + + def get_client_setting(self, key) -> Optional[str]: + """ + :param key: The setting key + :return: The string value of the setting, if it exists, or None + """ + return self.client.get_client_setting(key=key) + + def min_version(self, version_str: str) -> bool: + """ + Determine whether the connected server is at least the submitted version + For Altinity Stable versions like 22.8.15.25.altinitystable + the last condition in the first list comprehension expression is added + :param version_str: A version string consisting of up to 4 integers delimited by dots + :return: True if version_str is greater than the server_version, False if less than + """ + return self.client.min_version(version_str) + + def close(self): + """ + Subclass implementation to close the connection to the server/deallocate the client + """ + self.client.close() + + async def query(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + column_oriented: Optional[bool] = None, + use_numpy: Optional[bool] = None, + max_str_len: Optional[int] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> QueryResult: + """ + Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. + For parameters, see the create_query_context method. + :return: QueryResult -- data and metadata from response + """ + + def _query(): + return self.client.query(query=query, parameters=parameters, settings=settings, query_formats=query_formats, + column_formats=column_formats, encoding=encoding, use_none=use_none, + column_oriented=column_oriented, use_numpy=use_numpy, max_str_len=max_str_len, + context=context, query_tz=query_tz, column_tzs=column_tzs, + external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query) + return result + + async def query_column_block_stream(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Variation of main query method that returns a stream of column oriented blocks. + For parameters, see the create_query_context method. + :return: StreamContext -- Iterable stream context that returns column oriented blocks + """ + + def _query_column_block_stream(): + return self.client.query_column_block_stream(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, + encoding=encoding, use_none=use_none, context=context, + query_tz=query_tz, column_tzs=column_tzs, + external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_column_block_stream) + return result + + async def query_row_block_stream(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Variation of main query method that returns a stream of row oriented blocks. + For parameters, see the create_query_context method. + :return: StreamContext -- Iterable stream context that returns blocks of rows + """ + + def _query_row_block_stream(): + return self.client.query_row_block_stream(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, + encoding=encoding, use_none=use_none, context=context, + query_tz=query_tz, column_tzs=column_tzs, + external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_row_block_stream) + return result + + async def query_rows_stream(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + context: QueryContext = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Variation of main query method that returns a stream of row oriented blocks. + For parameters, see the create_query_context method. + :return: StreamContext -- Iterable stream context that returns blocks of rows + """ + + def _query_rows_stream(): + return self.client.query_rows_stream(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, + encoding=encoding, use_none=use_none, context=context, + query_tz=query_tz, column_tzs=column_tzs, + external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_rows_stream) + return result + + async def raw_query(self, + query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + fmt: str = None, + use_database: bool = True, + external_data: Optional[ExternalData] = None) -> bytes: + """ + Query method that simply returns the raw ClickHouse format bytes. + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param fmt: ClickHouse output format + :param use_database Send the database parameter to ClickHouse so the command will be executed in the client + database context + :param external_data External data to send with the query + :return: bytes representing raw ClickHouse return value based on format + """ + + def _raw_query(): + return self.client.raw_query(query=query, parameters=parameters, settings=settings, fmt=fmt, + use_database=use_database, external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _raw_query) + return result + + async def raw_stream(self, query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + fmt: str = None, + use_database: bool = True, + external_data: Optional[ExternalData] = None) -> io.IOBase: + """ + Query method that returns the result as an io.IOBase iterator. + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param fmt: ClickHouse output format + :param use_database Send the database parameter to ClickHouse so the command will be executed in the client + database context + :param external_data External data to send with the query + :return: io.IOBase stream/iterator for the result + """ + + def _raw_stream(): + return self.client.raw_stream(query=query, parameters=parameters, settings=settings, fmt=fmt, + use_database=use_database, external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _raw_stream) + return result + + async def query_np(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None): + """ + Query method that returns the results as a numpy array. + For parameter values, see the create_query_context method. + :return: Numpy array representing the result set + """ + + def _query_np(): + return self.client.query_np(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, encoding=encoding, + use_none=use_none, max_str_len=max_str_len, context=context, + external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_np) + return result + + async def query_np_stream(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Query method that returns the results as a stream of numpy arrays. + For parameter values, see the create_query_context method. + :return: Generator that yield a numpy array per block representing the result set + """ + + def _query_np_stream(): + return self.client.query_np_stream(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, + encoding=encoding, use_none=use_none, max_str_len=max_str_len, + context=context, external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_np_stream) + return result + + async def query_df(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + use_na_values: Optional[bool] = None, + query_tz: Optional[str] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None, + use_extended_dtypes: Optional[bool] = None): + """ + Query method that results the results as a pandas dataframe. + For parameter values, see the create_query_context method. + :return: Pandas dataframe representing the result set + """ + + def _query_df(): + return self.client.query_df(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, encoding=encoding, + use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values, + query_tz=query_tz, column_tzs=column_tzs, context=context, + external_data=external_data, use_extended_dtypes=use_extended_dtypes) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_df) + return result + + async def query_df_stream(self, + query: Optional[str] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, str]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + max_str_len: Optional[int] = None, + use_na_values: Optional[bool] = None, + query_tz: Optional[str] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + context: QueryContext = None, + external_data: Optional[ExternalData] = None, + use_extended_dtypes: Optional[bool] = None) -> StreamContext: + """ + Query method that returns the results as a StreamContext. + For parameter values, see the create_query_context method. + :return: Generator that yields a Pandas dataframe per block representing the result set + """ + + def _query_df_stream(): + return self.client.query_df_stream(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, + encoding=encoding, + use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values, + query_tz=query_tz, column_tzs=column_tzs, context=context, + external_data=external_data, use_extended_dtypes=use_extended_dtypes) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_df_stream) + return result + + def create_query_context(self, + query: Optional[Union[str, bytes]] = None, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + query_formats: Optional[Dict[str, str]] = None, + column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, + encoding: Optional[str] = None, + use_none: Optional[bool] = None, + column_oriented: Optional[bool] = None, + use_numpy: Optional[bool] = False, + max_str_len: Optional[int] = 0, + context: Optional[QueryContext] = None, + query_tz: Optional[Union[str, tzinfo]] = None, + column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, + use_na_values: Optional[bool] = None, + streaming: bool = False, + as_pandas: bool = False, + external_data: Optional[ExternalData] = None, + use_extended_dtypes: Optional[bool] = None) -> QueryContext: + """ + Creates or updates a reusable QueryContext object + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param query_formats: See QueryContext __init__ docstring + :param column_formats: See QueryContext __init__ docstring + :param encoding: See QueryContext __init__ docstring + :param use_none: Use None for ClickHouse NULL instead of default values. Note that using None in Numpy + arrays will force the numpy array dtype to 'object', which is often inefficient. This effect also + will impact the performance of Pandas dataframes. + :param column_oriented: Deprecated. Controls orientation of the QueryResult result_set property + :param use_numpy: Return QueryResult columns as one-dimensional numpy arrays + :param max_str_len: Limit returned ClickHouse String values to this length, which allows a Numpy + structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for + String columns will always be object arrays + :param context: An existing QueryContext to be updated with any provided parameter values + :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects). + Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime + objects with the selected timezone + :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to + tzinfo objects). The timezone will be applied to datetime objects returned in the query + :param use_na_values: Deprecated alias for use_advanced_dtypes + :param as_pandas Return the result columns as pandas.Series objects + :param streaming Marker used to correctly configure streaming queries + :param external_data ClickHouse "external data" to send with query + :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as + pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray + and StringArray. Defaulted to True for query_df methods + :return: Reusable QueryContext + """ + + return self.client.create_query_context(query=query, parameters=parameters, settings=settings, + query_formats=query_formats, column_formats=column_formats, + encoding=encoding, use_none=use_none, + column_oriented=column_oriented, + use_numpy=use_numpy, max_str_len=max_str_len, context=context, + query_tz=query_tz, column_tzs=column_tzs, + use_na_values=use_na_values, + streaming=streaming, as_pandas=as_pandas, + external_data=external_data, + use_extended_dtypes=use_extended_dtypes) + + async def query_arrow(self, + query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + use_strings: Optional[bool] = None, + external_data: Optional[ExternalData] = None): + """ + Query method using the ClickHouse Arrow format to return a PyArrow table + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) + :param external_data ClickHouse "external data" to send with query + :return: PyArrow.Table + """ + + def _query_arrow(): + return self.client.query_arrow(query=query, parameters=parameters, settings=settings, + use_strings=use_strings, external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_arrow) + return result + + async def query_arrow_stream(self, + query: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + settings: Optional[Dict[str, Any]] = None, + use_strings: Optional[bool] = None, + external_data: Optional[ExternalData] = None) -> StreamContext: + """ + Query method that returns the results as a stream of Arrow tables + :param query: Query statement/format string + :param parameters: Optional dictionary used to format the query + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) + :param external_data ClickHouse "external data" to send with query + :return: Generator that yields a PyArrow.Table for per block representing the result set + """ + + def _query_arrow_stream(): + return self.client.query_arrow_stream(query=query, parameters=parameters, settings=settings, + use_strings=use_strings, external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _query_arrow_stream) + return result + + async def command(self, + cmd: str, + parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, + data: Union[str, bytes] = None, + settings: Dict[str, Any] = None, + use_database: bool = True, + external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + """ + Client method that returns a single value instead of a result set + :param cmd: ClickHouse query/command as a python format string + :param parameters: Optional dictionary of key/values pairs to be formatted + :param data: Optional 'data' for the command (for INSERT INTO in particular) + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client + database context. Otherwise, no database will be specified with the command. This is useful for determining + the default user database + :param external_data ClickHouse "external data" to send with command/query + :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary + if no data returned + """ + + def _command(): + return self.client.command(cmd=cmd, parameters=parameters, data=data, settings=settings, + use_database=use_database, external_data=external_data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _command) + return result + + async def ping(self) -> bool: + """ + Validate the connection, does not throw an Exception (see debug logs) + :return: ClickHouse server is up and reachable + """ + + def _ping(): + return self.client.ping() + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _ping) + return result + + async def insert(self, + table: Optional[str] = None, + data: Sequence[Sequence[Any]] = None, + column_names: Union[str, Iterable[str]] = '*', + database: Optional[str] = None, + column_types: Sequence[ClickHouseType] = None, + column_type_names: Sequence[str] = None, + column_oriented: bool = False, + settings: Optional[Dict[str, Any]] = None, + context: InsertContext = None) -> QuerySummary: + """ + Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments + other than data are ignored + :param table: Target table + :param data: Sequence of sequences of Python data + :param column_names: Ordered list of column names or '*' if column types should be retrieved from the + ClickHouse table definition + :param database: Target database -- will use client default database if not specified. + :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from + the server + :param column_type_names: ClickHouse column type names. If set then column data does not need to be + retrieved from the server + :param column_oriented: If true the data is already "pivoted" in column form + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param context: Optional reusable insert context to allow repeated inserts into the same table with + different data batches + :return: QuerySummary with summary information, throws exception if insert fails + """ + + def _insert(): + return self.client.insert(table=table, data=data, column_names=column_names, database=database, + column_types=column_types, column_type_names=column_type_names, + column_oriented=column_oriented, settings=settings, context=context) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _insert) + return result + + async def insert_df(self, table: str = None, + df=None, + database: Optional[str] = None, + settings: Optional[Dict] = None, + column_names: Optional[Sequence[str]] = None, + column_types: Sequence[ClickHouseType] = None, + column_type_names: Sequence[str] = None, + context: InsertContext = None) -> QuerySummary: + """ + Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored + :param table: ClickHouse table + :param df: two-dimensional pandas dataframe + :param database: Optional ClickHouse database + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param column_names: An optional list of ClickHouse column names. If not set, the DataFrame column names + will be used + :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from + the server + :param column_type_names: ClickHouse column type names. If set then column data does not need to be + retrieved from the server + :param context: Optional reusable insert context to allow repeated inserts into the same table with + different data batches + :return: QuerySummary with summary information, throws exception if insert fails + """ + + def _insert_df(): + return self.client.insert_df(table=table, df=df, database=database, settings=settings, + column_names=column_names, + column_types=column_types, column_type_names=column_type_names, + context=context) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _insert_df) + return result + + async def insert_arrow(self, table: str, + arrow_table, database: str = None, + settings: Optional[Dict] = None) -> QuerySummary: + """ + Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format + :param table: ClickHouse table + :param arrow_table: PyArrow Table object + :param database: Optional ClickHouse database + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :return: QuerySummary with summary information, throws exception if insert fails + """ + + def _insert_arrow(): + return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, settings=settings) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _insert_arrow) + return result + + async def create_insert_context(self, + table: str, + column_names: Optional[Union[str, Sequence[str]]] = None, + database: Optional[str] = None, + column_types: Sequence[ClickHouseType] = None, + column_type_names: Sequence[str] = None, + column_oriented: bool = False, + settings: Optional[Dict[str, Any]] = None, + data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext: + """ + Builds a reusable insert context to hold state for a duration of an insert + :param table: Target table + :param database: Target database. If not set, uses the client default database + :param column_names: Optional ordered list of column names. If not set, all columns ('*') will be assumed + in the order specified by the table definition + :param database: Target database -- will use client default database if not specified + :param column_types: ClickHouse column types. Optional Sequence of ClickHouseType objects. If neither column + types nor column type names are set, actual column types will be retrieved from the server. + :param column_type_names: ClickHouse column type names. Specified column types by name string + :param column_oriented: If true the data is already "pivoted" in column form + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param data: Initial dataset for insert + :return Reusable insert context + """ + + def _create_insert_context(): + return self.client.create_insert_context(table=table, column_names=column_names, database=database, + column_types=column_types, column_type_names=column_type_names, + column_oriented=column_oriented, settings=settings, data=data) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _create_insert_context) + return result + + async def data_insert(self, context: InsertContext) -> QuerySummary: + """ + Subclass implementation of the data insert + :context: InsertContext parameter object + :return: No return, throws an exception if the insert fails + """ + + def _data_insert(): + return self.client.data_insert(context=context) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _data_insert) + return result + + async def raw_insert(self, table: str, + column_names: Optional[Sequence[str]] = None, + insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, + settings: Optional[Dict] = None, + fmt: Optional[str] = None, + compression: Optional[str] = None) -> QuerySummary: + """ + Insert data already formatted in a bytes object + :param table: Table name (whether qualified with the database name or not) + :param column_names: Sequence of column names + :param insert_block: Binary or string data already in a recognized ClickHouse format + :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param compression: Recognized ClickHouse `Accept-Encoding` header compression value + :param fmt: Valid clickhouse format + """ + + def _raw_insert(): + return self.client.raw_insert(table=table, column_names=column_names, insert_block=insert_block, + settings=settings, fmt=fmt, compression=compression) + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, _raw_insert) + return result diff --git a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py index 03b5bbe324..bf3c0d863f 100644 --- a/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py +++ b/contrib/python/clickhouse-connect/clickhouse_connect/driver/httpclient.py @@ -69,7 +69,8 @@ class HttpClient(Client): https_proxy: Optional[str] = None, server_host_name: Optional[str] = None, apply_server_timezone: Optional[Union[str, bool]] = None, - show_clickhouse_errors: Optional[bool] = None): + show_clickhouse_errors: Optional[bool] = None, + autogenerate_session_id: Optional[bool] = None): """ Create an HTTP ClickHouse Connect client See clickhouse_connect.get_client for parameters @@ -93,7 +94,7 @@ class HttpClient(Client): # pylint: disable=too-many-boolean-expressions if not self.http and (server_host_name or ca_cert or client_cert or not verify or https_proxy): options = {'verify': verify is not False} - dict_add(options,'ca_cert', ca_cert) + dict_add(options, 'ca_cert', ca_cert) dict_add(options, 'client_cert', client_cert) dict_add(options, 'client_cert_key', client_cert_key) if server_host_name: @@ -128,9 +129,14 @@ class HttpClient(Client): self._progress_interval = None self._active_session = None + # allow to override the global autogenerate_session_id setting via the constructor params + _autogenerate_session_id = common.get_setting('autogenerate_session_id') \ + if autogenerate_session_id is None \ + else autogenerate_session_id + if session_id: ch_settings['session_id'] = session_id - elif 'session_id' not in ch_settings and common.get_setting('autogenerate_session_id'): + elif 'session_id' not in ch_settings and _autogenerate_session_id: ch_settings['session_id'] = str(uuid.uuid4()) if coerce_bool(compress): diff --git a/contrib/python/clickhouse-connect/ya.make b/contrib/python/clickhouse-connect/ya.make index a1b0d07909..72a6155ce2 100644 --- a/contrib/python/clickhouse-connect/ya.make +++ b/contrib/python/clickhouse-connect/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(0.7.15) +VERSION(0.7.16) LICENSE(Apache-2.0) @@ -53,6 +53,7 @@ PY_SRCS( clickhouse_connect/dbapi/connection.py clickhouse_connect/dbapi/cursor.py clickhouse_connect/driver/__init__.py + clickhouse_connect/driver/asyncclient.py clickhouse_connect/driver/buffer.py clickhouse_connect/driver/client.py clickhouse_connect/driver/common.py diff --git a/contrib/python/google-auth/py3/.dist-info/METADATA b/contrib/python/google-auth/py3/.dist-info/METADATA index 040faf7bb3..1814862af6 100644 --- a/contrib/python/google-auth/py3/.dist-info/METADATA +++ b/contrib/python/google-auth/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: google-auth -Version: 2.31.0 +Version: 2.32.0 Summary: Google Authentication Library Home-page: https://github.com/googleapis/google-auth-library-python Author: Google Cloud Platform diff --git a/contrib/python/google-auth/py3/google/auth/external_account.py b/contrib/python/google-auth/py3/google/auth/external_account.py index 3943de2a34..df0511f255 100644 --- a/contrib/python/google-auth/py3/google/auth/external_account.py +++ b/contrib/python/google-auth/py3/google/auth/external_account.py @@ -31,6 +31,7 @@ import abc import copy from dataclasses import dataclass import datetime +import functools import io import json import re @@ -394,6 +395,12 @@ class Credentials( def refresh(self, request): scopes = self._scopes if self._scopes is not None else self._default_scopes + # Inject client certificate into request. + if self._mtls_required(): + request = functools.partial( + request, cert=self._get_mtls_cert_and_key_paths() + ) + if self._should_initialize_impersonated_credentials(): self._impersonated_credentials = self._initialize_impersonated_credentials() @@ -523,6 +530,33 @@ class Credentials( return metrics_options + def _mtls_required(self): + """Returns a boolean representing whether the current credential is configured + for mTLS and should add a certificate to the outgoing calls to the sts and service + account impersonation endpoint. + + Returns: + bool: True if the credential is configured for mTLS, False if it is not. + """ + return False + + def _get_mtls_cert_and_key_paths(self): + """Gets the file locations for a certificate and private key file + to be used for configuring mTLS for the sts and service account + impersonation calls. Currently only expected to return a value when using + X509 workload identity federation. + + Returns: + Tuple[str, str]: The cert and key file locations as strings in a tuple. + + Raises: + NotImplementedError: When the current credential is not configured for + mTLS. + """ + raise NotImplementedError( + "_get_mtls_cert_and_key_location must be implemented." + ) + @classmethod def from_info(cls, info, **kwargs): """Creates a Credentials instance from parsed external account info. diff --git a/contrib/python/google-auth/py3/google/auth/identity_pool.py b/contrib/python/google-auth/py3/google/auth/identity_pool.py index 1c97885a4a..47f9a55715 100644 --- a/contrib/python/google-auth/py3/google/auth/identity_pool.py +++ b/contrib/python/google-auth/py3/google/auth/identity_pool.py @@ -48,6 +48,7 @@ from typing import NamedTuple from google.auth import _helpers from google.auth import exceptions from google.auth import external_account +from google.auth.transport import _mtls_helper class SubjectTokenSupplier(metaclass=abc.ABCMeta): @@ -141,6 +142,14 @@ class _UrlSupplier(SubjectTokenSupplier): ) +class _X509Supplier(SubjectTokenSupplier): + """Internal supplier for X509 workload credentials. This class is used internally and always returns an empty string as the subject token.""" + + @_helpers.copy_docstring(SubjectTokenSupplier) + def get_subject_token(self, context, request): + return "" + + def _parse_token_data(token_content, format_type="text", subject_token_field_name=None): if format_type == "text": token = token_content.content @@ -247,6 +256,7 @@ class Credentials(external_account.Credentials): self._subject_token_supplier = subject_token_supplier self._credential_source_file = None self._credential_source_url = None + self._credential_source_certificate = None else: if not isinstance(credential_source, Mapping): self._credential_source_executable = None @@ -255,45 +265,22 @@ class Credentials(external_account.Credentials): ) self._credential_source_file = credential_source.get("file") self._credential_source_url = credential_source.get("url") - self._credential_source_headers = credential_source.get("headers") - credential_source_format = credential_source.get("format", {}) - # Get credential_source format type. When not provided, this - # defaults to text. - self._credential_source_format_type = ( - credential_source_format.get("type") or "text" - ) + self._credential_source_certificate = credential_source.get("certificate") + # environment_id is only supported in AWS or dedicated future external # account credentials. if "environment_id" in credential_source: raise exceptions.MalformedError( "Invalid Identity Pool credential_source field 'environment_id'" ) - if self._credential_source_format_type not in ["text", "json"]: - raise exceptions.MalformedError( - "Invalid credential_source format '{}'".format( - self._credential_source_format_type - ) - ) - # For JSON types, get the required subject_token field name. - if self._credential_source_format_type == "json": - self._credential_source_field_name = credential_source_format.get( - "subject_token_field_name" - ) - if self._credential_source_field_name is None: - raise exceptions.MalformedError( - "Missing subject_token_field_name for JSON credential_source format" - ) - else: - self._credential_source_field_name = None - if self._credential_source_file and self._credential_source_url: - raise exceptions.MalformedError( - "Ambiguous credential_source. 'file' is mutually exclusive with 'url'." - ) - if not self._credential_source_file and not self._credential_source_url: - raise exceptions.MalformedError( - "Missing credential_source. A 'file' or 'url' must be provided." - ) + # check that only one of file, url, or certificate are provided. + self._validate_single_source() + + if self._credential_source_certificate: + self._validate_certificate_config() + else: + self._validate_file_or_url_config(credential_source) if self._credential_source_file: self._subject_token_supplier = _FileSupplier( @@ -301,13 +288,15 @@ class Credentials(external_account.Credentials): self._credential_source_format_type, self._credential_source_field_name, ) - else: + elif self._credential_source_url: self._subject_token_supplier = _UrlSupplier( self._credential_source_url, self._credential_source_format_type, self._credential_source_field_name, self._credential_source_headers, ) + else: # self._credential_source_certificate + self._subject_token_supplier = _X509Supplier() @_helpers.copy_docstring(external_account.Credentials) def retrieve_subject_token(self, request): @@ -315,16 +304,31 @@ class Credentials(external_account.Credentials): self._supplier_context, request ) + def _get_mtls_cert_and_key_paths(self): + if self._credential_source_certificate is None: + raise exceptions.RefreshError( + 'The credential is not configured to use mtls requests. The credential should include a "certificate" section in the credential source.' + ) + else: + return _mtls_helper._get_workload_cert_and_key_paths( + self._certificate_config_location + ) + + def _mtls_required(self): + return self._credential_source_certificate is not None + def _create_default_metrics_options(self): metrics_options = super(Credentials, self)._create_default_metrics_options() - # Check that credential source is a dict before checking for file vs url. This check needs to be done + # Check that credential source is a dict before checking for credential type. This check needs to be done # here because the external_account credential constructor needs to pass the metrics options to the # impersonated credential object before the identity_pool credentials are validated. if isinstance(self._credential_source, Mapping): if self._credential_source.get("file"): metrics_options["source"] = "file" - else: + elif self._credential_source.get("url"): metrics_options["source"] = "url" + else: + metrics_options["source"] = "x509" else: metrics_options["source"] = "programmatic" return metrics_options @@ -339,6 +343,67 @@ class Credentials(external_account.Credentials): args.update({"subject_token_supplier": self._subject_token_supplier}) return args + def _validate_certificate_config(self): + self._certificate_config_location = self._credential_source_certificate.get( + "certificate_config_location" + ) + use_default = self._credential_source_certificate.get( + "use_default_certificate_config" + ) + if self._certificate_config_location and use_default: + raise exceptions.MalformedError( + "Invalid certificate configuration, certificate_config_location cannot be specified when use_default_certificate_config = true." + ) + if not self._certificate_config_location and not use_default: + raise exceptions.MalformedError( + "Invalid certificate configuration, use_default_certificate_config should be true if no certificate_config_location is provided." + ) + + def _validate_file_or_url_config(self, credential_source): + self._credential_source_headers = credential_source.get("headers") + credential_source_format = credential_source.get("format", {}) + # Get credential_source format type. When not provided, this + # defaults to text. + self._credential_source_format_type = ( + credential_source_format.get("type") or "text" + ) + if self._credential_source_format_type not in ["text", "json"]: + raise exceptions.MalformedError( + "Invalid credential_source format '{}'".format( + self._credential_source_format_type + ) + ) + # For JSON types, get the required subject_token field name. + if self._credential_source_format_type == "json": + self._credential_source_field_name = credential_source_format.get( + "subject_token_field_name" + ) + if self._credential_source_field_name is None: + raise exceptions.MalformedError( + "Missing subject_token_field_name for JSON credential_source format" + ) + else: + self._credential_source_field_name = None + + def _validate_single_source(self): + credential_sources = [ + self._credential_source_file, + self._credential_source_url, + self._credential_source_certificate, + ] + valid_credential_sources = list( + filter(lambda source: source is not None, credential_sources) + ) + + if len(valid_credential_sources) > 1: + raise exceptions.MalformedError( + "Ambiguous credential_source. 'file', 'url', and 'certificate' are mutually exclusive.." + ) + if len(valid_credential_sources) != 1: + raise exceptions.MalformedError( + "Missing credential_source. A 'file', 'url', or 'certificate' must be provided." + ) + @classmethod def from_info(cls, info, **kwargs): """Creates an Identity Pool Credentials instance from parsed external account info. diff --git a/contrib/python/google-auth/py3/google/auth/transport/_mtls_helper.py b/contrib/python/google-auth/py3/google/auth/transport/_mtls_helper.py index e95b953a10..6299e2bdea 100644 --- a/contrib/python/google-auth/py3/google/auth/transport/_mtls_helper.py +++ b/contrib/python/google-auth/py3/google/auth/transport/_mtls_helper.py @@ -105,9 +105,50 @@ def _get_workload_cert_and_key(certificate_config_path=None): google.auth.exceptions.ClientCertError: if problems occurs when retrieving the certificate or key information. """ - absolute_path = _get_cert_config_path(certificate_config_path) + + cert_path, key_path = _get_workload_cert_and_key_paths(certificate_config_path) + + if cert_path is None and key_path is None: + return None, None + + return _read_cert_and_key_files(cert_path, key_path) + + +def _get_cert_config_path(certificate_config_path=None): + """Get the certificate configuration path based on the following order: + + 1: Explicit override, if set + 2: Environment variable, if set + 3: Well-known location + + Returns "None" if the selected config file does not exist. + + Args: + certificate_config_path (string): The certificate config path. If provided, the well known + location and environment variable will be ignored. + + Returns: + The absolute path of the certificate config file, and None if the file does not exist. + """ + + if certificate_config_path is None: + env_path = environ.get(_CERTIFICATE_CONFIGURATION_ENV, None) + if env_path is not None and env_path != "": + certificate_config_path = env_path + else: + certificate_config_path = _CERTIFICATE_CONFIGURATION_DEFAULT_PATH + + certificate_config_path = path.expanduser(certificate_config_path) + if not path.exists(certificate_config_path): + return None + return certificate_config_path + + +def _get_workload_cert_and_key_paths(config_path): + absolute_path = _get_cert_config_path(config_path) if absolute_path is None: return None, None + data = _load_json_file(absolute_path) if "cert_configs" not in data: @@ -142,37 +183,7 @@ def _get_workload_cert_and_key(certificate_config_path=None): ) key_path = workload["key_path"] - return _read_cert_and_key_files(cert_path, key_path) - - -def _get_cert_config_path(certificate_config_path=None): - """Gets the certificate configuration full path using the following order of precedence: - - 1: Explicit override, if set - 2: Environment variable, if set - 3: Well-known location - - Returns "None" if the selected config file does not exist. - - Args: - certificate_config_path (string): The certificate config path. If provided, the well known - location and environment variable will be ignored. - - Returns: - The absolute path of the certificate config file, and None if the file does not exist. - """ - - if certificate_config_path is None: - env_path = environ.get(_CERTIFICATE_CONFIGURATION_ENV, None) - if env_path is not None and env_path != "": - certificate_config_path = env_path - else: - certificate_config_path = _CERTIFICATE_CONFIGURATION_DEFAULT_PATH - - certificate_config_path = path.expanduser(certificate_config_path) - if not path.exists(certificate_config_path): - return None - return certificate_config_path + return cert_path, key_path def _read_cert_and_key_files(cert_path, key_path): diff --git a/contrib/python/google-auth/py3/google/auth/version.py b/contrib/python/google-auth/py3/google/auth/version.py index b9313c667d..51f7f62acd 100644 --- a/contrib/python/google-auth/py3/google/auth/version.py +++ b/contrib/python/google-auth/py3/google/auth/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "2.31.0" +__version__ = "2.32.0" diff --git a/contrib/python/google-auth/py3/tests/test_external_account.py b/contrib/python/google-auth/py3/tests/test_external_account.py index c458b21b64..3c372e6291 100644 --- a/contrib/python/google-auth/py3/tests/test_external_account.py +++ b/contrib/python/google-auth/py3/tests/test_external_account.py @@ -235,10 +235,16 @@ class TestCredentials(object): return request @classmethod - def assert_token_request_kwargs(cls, request_kwargs, headers, request_data): + def assert_token_request_kwargs( + cls, request_kwargs, headers, request_data, cert=None + ): assert request_kwargs["url"] == cls.TOKEN_URL assert request_kwargs["method"] == "POST" assert request_kwargs["headers"] == headers + if cert is not None: + assert request_kwargs["cert"] == cert + else: + assert "cert" not in request_kwargs assert request_kwargs["body"] is not None body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) for (k, v) in body_tuples: @@ -246,10 +252,16 @@ class TestCredentials(object): assert len(body_tuples) == len(request_data.keys()) @classmethod - def assert_impersonation_request_kwargs(cls, request_kwargs, headers, request_data): + def assert_impersonation_request_kwargs( + cls, request_kwargs, headers, request_data, cert=None + ): assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL assert request_kwargs["method"] == "POST" assert request_kwargs["headers"] == headers + if cert is not None: + assert request_kwargs["cert"] == cert + else: + assert "cert" not in request_kwargs assert request_kwargs["body"] is not None body_json = json.loads(request_kwargs["body"].decode("utf-8")) assert body_json == request_data @@ -670,6 +682,56 @@ class TestCredentials(object): return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, ) @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) + @mock.patch( + "google.auth.external_account.Credentials._mtls_required", return_value=True + ) + @mock.patch( + "google.auth.external_account.Credentials._get_mtls_cert_and_key_paths", + return_value=("path/to/cert.pem", "path/to/key.pem"), + ) + def test_refresh_with_mtls( + self, + mock_get_mtls_cert_and_key_paths, + mock_mtls_required, + unused_utcnow, + mock_auth_lib_value, + ): + response = self.SUCCESS_RESPONSE.copy() + # Test custom expiration to confirm expiry is set correctly. + response["expires_in"] = 2800 + expected_expiry = datetime.datetime.min + datetime.timedelta( + seconds=response["expires_in"] + ) + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", + } + request_data = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "audience": self.AUDIENCE, + "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", + "subject_token": "subject_token_0", + "subject_token_type": self.SUBJECT_TOKEN_TYPE, + } + request = self.make_mock_request(status=http_client.OK, data=response) + credentials = self.make_credentials() + + credentials.refresh(request) + + expected_cert_path = ("path/to/cert.pem", "path/to/key.pem") + self.assert_token_request_kwargs( + request.call_args[1], headers, request_data, expected_cert_path + ) + assert credentials.valid + assert credentials.expiry == expected_expiry + assert not credentials.expired + assert credentials.token == response["access_token"] + + @mock.patch( + "google.auth.metrics.python_and_auth_lib_version", + return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, + ) + @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) def test_refresh_workforce_without_client_auth_success( self, unused_utcnow, test_auth_lib_value ): @@ -877,6 +939,101 @@ class TestCredentials(object): "google.auth.metrics.python_and_auth_lib_version", return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, ) + @mock.patch( + "google.auth.external_account.Credentials._mtls_required", return_value=True + ) + @mock.patch( + "google.auth.external_account.Credentials._get_mtls_cert_and_key_paths", + return_value=("path/to/cert.pem", "path/to/key.pem"), + ) + def test_refresh_impersonation_with_mtls_success( + self, + mock_get_mtls_cert_and_key_paths, + mock_mtls_required, + mock_metrics_header_value, + mock_auth_lib_value, + ): + # Simulate service account access token expires in 2800 seconds. + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800) + ).isoformat("T") + "Z" + expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ") + # STS token exchange request/response. + token_response = self.SUCCESS_RESPONSE.copy() + token_headers = { + "Content-Type": "application/x-www-form-urlencoded", + "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false", + } + token_request_data = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "audience": self.AUDIENCE, + "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", + "subject_token": "subject_token_0", + "subject_token_type": self.SUBJECT_TOKEN_TYPE, + "scope": "https://www.googleapis.com/auth/iam", + } + # Service account impersonation request/response. + impersonation_response = { + "accessToken": "SA_ACCESS_TOKEN", + "expireTime": expire_time, + } + impersonation_headers = { + "Content-Type": "application/json", + "authorization": "Bearer {}".format(token_response["access_token"]), + "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + "x-allowed-locations": "0x0", + } + impersonation_request_data = { + "delegates": None, + "scope": self.SCOPES, + "lifetime": "3600s", + } + # Initialize mock request to handle token exchange and service account + # impersonation request. + request = self.make_mock_request( + status=http_client.OK, + data=token_response, + impersonation_status=http_client.OK, + impersonation_data=impersonation_response, + ) + # Initialize credentials with service account impersonation. + credentials = self.make_credentials( + service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, + scopes=self.SCOPES, + ) + + credentials.refresh(request) + + # Only 2 requests should be processed. + assert len(request.call_args_list) == 2 + # Verify token exchange request parameters. + expected_cert_paths = ("path/to/cert.pem", "path/to/key.pem") + self.assert_token_request_kwargs( + request.call_args_list[0][1], + token_headers, + token_request_data, + expected_cert_paths, + ) + # Verify service account impersonation request parameters. + self.assert_impersonation_request_kwargs( + request.call_args_list[1][1], + impersonation_headers, + impersonation_request_data, + expected_cert_paths, + ) + assert credentials.valid + assert credentials.expiry == expected_expiry + assert not credentials.expired + assert credentials.token == impersonation_response["accessToken"] + + @mock.patch( + "google.auth.metrics.token_request_access_token_impersonate", + return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + ) + @mock.patch( + "google.auth.metrics.python_and_auth_lib_version", + return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, + ) def test_refresh_workforce_impersonation_without_client_auth_success( self, mock_metrics_header_value, mock_auth_lib_value ): diff --git a/contrib/python/google-auth/py3/tests/test_identity_pool.py b/contrib/python/google-auth/py3/tests/test_identity_pool.py index e4efe46c6b..cc6cbf0882 100644 --- a/contrib/python/google-auth/py3/tests/test_identity_pool.py +++ b/contrib/python/google-auth/py3/tests/test_identity_pool.py @@ -180,6 +180,12 @@ class TestCredentials(object): "url": CREDENTIAL_URL, "format": {"type": "json", "subject_token_field_name": "access_token"}, } + CREDENTIAL_SOURCE_CERTIFICATE = { + "certificate": {"use_default_certificate_config": "true"} + } + CREDENTIAL_SOURCE_CERTIFICATE_NOT_DEFAULT = { + "certificate": {"certificate_config_location": "path/to/config"} + } SUCCESS_RESPONSE = { "access_token": "ACCESS_TOKEN", "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", @@ -678,6 +684,40 @@ class TestCredentials(object): assert excinfo.match(r"Ambiguous credential_source") + def test_constructor_invalid_options_url_and_certificate(self): + credential_source = { + "url": self.CREDENTIAL_URL, + "certificate": {"certificate": {"use_default_certificate_config": True}}, + } + + with pytest.raises(ValueError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match(r"Ambiguous credential_source") + + def test_constructor_invalid_options_file_and_certificate(self): + credential_source = { + "file": SUBJECT_TOKEN_TEXT_FILE, + "certificate": {"certificate": {"use_default_certificate": True}}, + } + + with pytest.raises(ValueError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match(r"Ambiguous credential_source") + + def test_constructor_invalid_options_url_file_and_certificate(self): + credential_source = { + "file": SUBJECT_TOKEN_TEXT_FILE, + "url": self.CREDENTIAL_URL, + "certificate": {"certificate": {"use_default_certificate": True}}, + } + + with pytest.raises(ValueError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match(r"Ambiguous credential_source") + def test_constructor_invalid_options_environment_id(self): credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"} @@ -717,7 +757,7 @@ class TestCredentials(object): ) def test_constructor_invalid_credential_source_format_type(self): - credential_source = {"format": {"type": "xml"}} + credential_source = {"file": "test.txt", "format": {"type": "xml"}} with pytest.raises(ValueError) as excinfo: self.make_credentials(credential_source=credential_source) @@ -725,7 +765,7 @@ class TestCredentials(object): assert excinfo.match(r"Invalid credential_source format 'xml'") def test_constructor_missing_subject_token_field_name(self): - credential_source = {"format": {"type": "json"}} + credential_source = {"file": "test.txt", "format": {"type": "json"}} with pytest.raises(ValueError) as excinfo: self.make_credentials(credential_source=credential_source) @@ -734,6 +774,27 @@ class TestCredentials(object): r"Missing subject_token_field_name for JSON credential_source format" ) + def test_constructor_default_and_file_location_certificate(self): + credential_source = { + "certificate": { + "use_default_certificate_config": True, + "certificate_config_location": "test", + } + } + + with pytest.raises(ValueError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match(r"Invalid certificate configuration") + + def test_constructor_no_default_or_file_location_certificate(self): + credential_source = {"certificate": {"use_default_certificate_config": False}} + + with pytest.raises(ValueError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match(r"Invalid certificate configuration") + def test_info_with_workforce_pool_user_project(self): credentials = self.make_credentials( audience=WORKFORCE_AUDIENCE, @@ -783,6 +844,36 @@ class TestCredentials(object): "universe_domain": DEFAULT_UNIVERSE_DOMAIN, } + def test_info_with_certificate_credential_source(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE.copy() + ) + + assert credentials.info == { + "type": "external_account", + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, + "credential_source": self.CREDENTIAL_SOURCE_CERTIFICATE, + "universe_domain": DEFAULT_UNIVERSE_DOMAIN, + } + + def test_info_with_non_default_certificate_credential_source(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE_NOT_DEFAULT.copy() + ) + + assert credentials.info == { + "type": "external_account", + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, + "credential_source": self.CREDENTIAL_SOURCE_CERTIFICATE_NOT_DEFAULT, + "universe_domain": DEFAULT_UNIVERSE_DOMAIN, + } + def test_info_with_default_token_url(self): credentials = identity_pool.Credentials( audience=AUDIENCE, @@ -846,6 +937,15 @@ class TestCredentials(object): assert subject_token == JSON_FILE_SUBJECT_TOKEN + def test_retrieve_subject_token_certificate(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE + ) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == "" + def test_retrieve_subject_token_json_file_invalid_field_name(self): credential_source = { "file": SUBJECT_TOKEN_JSON_FILE, @@ -1486,3 +1586,28 @@ class TestCredentials(object): scopes=SCOPES, default_scopes=None, ) + + @mock.patch( + "google.auth.transport._mtls_helper._get_workload_cert_and_key_paths", + return_value=("cert", "key"), + ) + def test_get_mtls_certs(self, mock_get_workload_cert_and_key_paths): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE.copy() + ) + + cert, key = credentials._get_mtls_cert_and_key_paths() + assert cert == "cert" + assert key == "key" + + def test_get_mtls_certs_invalid(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_TEXT.copy() + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials._get_mtls_cert_and_key_paths() + + assert excinfo.match( + 'The credential is not configured to use mtls requests. The credential should include a "certificate" section in the credential source.' + ) diff --git a/contrib/python/google-auth/py3/ya.make b/contrib/python/google-auth/py3/ya.make index 63ef7c67e4..4ea57aefcc 100644 --- a/contrib/python/google-auth/py3/ya.make +++ b/contrib/python/google-auth/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(2.31.0) +VERSION(2.32.0) LICENSE(Apache-2.0) |