diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-03-07 13:39:40 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-03-07 13:48:37 +0300 |
commit | a5bc35bb658487b44e707d555998bbec6cb14eab (patch) | |
tree | 4df3ffcdd48c13abc65053c681eeb9ad05901f2b | |
parent | e49f2e6094ceb19570c44590cd0e5909a104b81d (diff) | |
download | ydb-a5bc35bb658487b44e707d555998bbec6cb14eab.tar.gz |
Intermediate changes
46 files changed, 2036 insertions, 668 deletions
diff --git a/contrib/libs/cxxsupp/libcxx/patches/00-future-2023-11-11-license-typo-acb9156266206d53134e986bb4481b900bc4db75.patch b/contrib/libs/cxxsupp/libcxx/patches/00-future-2023-11-11-license-typo-acb9156266206d53134e986bb4481b900bc4db75.patch new file mode 100644 index 0000000000..bed99946dc --- /dev/null +++ b/contrib/libs/cxxsupp/libcxx/patches/00-future-2023-11-11-license-typo-acb9156266206d53134e986bb4481b900bc4db75.patch @@ -0,0 +1,12 @@ +diff --git a/include/__algorithm/ranges_equal_range.h b/include/__algorithm/ranges_equal_range.h +index ed78cf3..1ff8856 100644 +--- a/include/__algorithm/ranges_equal_range.h ++++ b/include/__algorithm/ranges_equal_range.h +@@ -1,6 +1,6 @@ + //===----------------------------------------------------------------------===// + // +-// Part of the LLVM __project, under the Apache License v2.0 with LLVM Exceptions. ++// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. + // See https://llvm.org/LICENSE.txt for license information. + // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + // diff --git a/contrib/python/google-auth/py3/.dist-info/METADATA b/contrib/python/google-auth/py3/.dist-info/METADATA index c8e96994cf..898701cac8 100644 --- a/contrib/python/google-auth/py3/.dist-info/METADATA +++ b/contrib/python/google-auth/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: google-auth -Version: 2.28.0 +Version: 2.28.1 Summary: Google Authentication Library Home-page: https://github.com/googleapis/google-auth-library-python Author: Google Cloud Platform diff --git a/contrib/python/google-auth/py3/google/auth/_refresh_worker.py b/contrib/python/google-auth/py3/google/auth/_refresh_worker.py index 9bb0ccc2c5..674032d849 100644 --- a/contrib/python/google-auth/py3/google/auth/_refresh_worker.py +++ b/contrib/python/google-auth/py3/google/auth/_refresh_worker.py @@ -75,7 +75,7 @@ class RefreshThreadManager: def __setstate__(self, state): """Pickle helper that deserializes the _lock attribute.""" - state["_key"] = threading.Lock() + state["_lock"] = threading.Lock() self.__dict__.update(state) diff --git a/contrib/python/google-auth/py3/google/auth/version.py b/contrib/python/google-auth/py3/google/auth/version.py index 9672a6c412..7580efbee5 100644 --- a/contrib/python/google-auth/py3/google/auth/version.py +++ b/contrib/python/google-auth/py3/google/auth/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "2.28.0" +__version__ = "2.28.1" diff --git a/contrib/python/google-auth/py3/tests/test__refresh_worker.py b/contrib/python/google-auth/py3/tests/test__refresh_worker.py index f842b02cac..c25965d10b 100644 --- a/contrib/python/google-auth/py3/tests/test__refresh_worker.py +++ b/contrib/python/google-auth/py3/tests/test__refresh_worker.py @@ -150,7 +150,10 @@ def test_refresh_dead_worker(): def test_pickle(): w = _refresh_worker.RefreshThreadManager() + # For some reason isinstance cannot interpret threading.Lock as a type. + assert w._lock is not None pickled_manager = pickle.dumps(w) manager = pickle.loads(pickled_manager) assert isinstance(manager, _refresh_worker.RefreshThreadManager) + assert manager._lock is not None diff --git a/contrib/python/google-auth/py3/ya.make b/contrib/python/google-auth/py3/ya.make index 7863862fdc..83e92e22ff 100644 --- a/contrib/python/google-auth/py3/ya.make +++ b/contrib/python/google-auth/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(2.28.0) +VERSION(2.28.1) LICENSE(Apache-2.0) diff --git a/contrib/python/httpcore/.dist-info/METADATA b/contrib/python/httpcore/.dist-info/METADATA index 51de714c58..7804db1a8b 100644 --- a/contrib/python/httpcore/.dist-info/METADATA +++ b/contrib/python/httpcore/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: httpcore -Version: 1.0.3 +Version: 1.0.4 Summary: A minimal low-level HTTP client. Project-URL: Documentation, https://www.encode.io/httpcore Project-URL: Homepage, https://www.encode.io/httpcore/ @@ -33,7 +33,7 @@ Requires-Dist: h2<5,>=3; extra == 'http2' Provides-Extra: socks Requires-Dist: socksio==1.*; extra == 'socks' Provides-Extra: trio -Requires-Dist: trio<0.24.0,>=0.22.0; extra == 'trio' +Requires-Dist: trio<0.25.0,>=0.22.0; extra == 'trio' Description-Content-Type: text/markdown # HTTP Core @@ -153,6 +153,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +## 1.0.4 (February 21st, 2024) + +- Add `target` request extension. (#888) +- Fix support for connection `Upgrade` and `CONNECT` when some data in the stream has been read. (#882) + ## 1.0.3 (February 13th, 2024) - Fix support for async cancellations. (#880) diff --git a/contrib/python/httpcore/httpcore/__init__.py b/contrib/python/httpcore/httpcore/__init__.py index 3709fc4080..ea2a240921 100644 --- a/contrib/python/httpcore/httpcore/__init__.py +++ b/contrib/python/httpcore/httpcore/__init__.py @@ -130,7 +130,7 @@ __all__ = [ "WriteError", ] -__version__ = "1.0.3" +__version__ = "1.0.4" __locals = locals() diff --git a/contrib/python/httpcore/httpcore/_async/http11.py b/contrib/python/httpcore/httpcore/_async/http11.py index a5eb480840..0493a923dc 100644 --- a/contrib/python/httpcore/httpcore/_async/http11.py +++ b/contrib/python/httpcore/httpcore/_async/http11.py @@ -1,8 +1,10 @@ import enum import logging +import ssl import time from types import TracebackType from typing import ( + Any, AsyncIterable, AsyncIterator, List, @@ -107,6 +109,7 @@ class AsyncHTTP11Connection(AsyncConnectionInterface): status, reason_phrase, headers, + trailing_data, ) = await self._receive_response_headers(**kwargs) trace.return_value = ( http_version, @@ -115,6 +118,14 @@ class AsyncHTTP11Connection(AsyncConnectionInterface): headers, ) + network_stream = self._network_stream + + # CONNECT or Upgrade request + if (status == 101) or ( + (request.method == b"CONNECT") and (200 <= status < 300) + ): + network_stream = AsyncHTTP11UpgradeStream(network_stream, trailing_data) + return Response( status=status, headers=headers, @@ -122,7 +133,7 @@ class AsyncHTTP11Connection(AsyncConnectionInterface): extensions={ "http_version": http_version, "reason_phrase": reason_phrase, - "network_stream": self._network_stream, + "network_stream": network_stream, }, ) except BaseException as exc: @@ -167,7 +178,7 @@ class AsyncHTTP11Connection(AsyncConnectionInterface): async def _receive_response_headers( self, request: Request - ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]: + ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], bytes]: timeouts = request.extensions.get("timeout", {}) timeout = timeouts.get("read", None) @@ -187,7 +198,9 @@ class AsyncHTTP11Connection(AsyncConnectionInterface): # raw header casing, rather than the enforced lowercase headers. headers = event.headers.raw_items() - return http_version, event.status_code, event.reason, headers + trailing_data, _ = self._h11_state.trailing_data + + return http_version, event.status_code, event.reason, headers, trailing_data async def _receive_response_body(self, request: Request) -> AsyncIterator[bytes]: timeouts = request.extensions.get("timeout", {}) @@ -340,3 +353,34 @@ class HTTP11ConnectionByteStream: self._closed = True async with Trace("response_closed", logger, self._request): await self._connection._response_closed() + + +class AsyncHTTP11UpgradeStream(AsyncNetworkStream): + def __init__(self, stream: AsyncNetworkStream, leading_data: bytes) -> None: + self._stream = stream + self._leading_data = leading_data + + async def read(self, max_bytes: int, timeout: Optional[float] = None) -> bytes: + if self._leading_data: + buffer = self._leading_data[:max_bytes] + self._leading_data = self._leading_data[max_bytes:] + return buffer + else: + return await self._stream.read(max_bytes, timeout) + + async def write(self, buffer: bytes, timeout: Optional[float] = None) -> None: + await self._stream.write(buffer, timeout) + + async def aclose(self) -> None: + await self._stream.aclose() + + async def start_tls( + self, + ssl_context: ssl.SSLContext, + server_hostname: Optional[str] = None, + timeout: Optional[float] = None, + ) -> AsyncNetworkStream: + return await self._stream.start_tls(ssl_context, server_hostname, timeout) + + def get_extra_info(self, info: str) -> Any: + return self._stream.get_extra_info(info) diff --git a/contrib/python/httpcore/httpcore/_models.py b/contrib/python/httpcore/httpcore/_models.py index 11bfcd84f0..397bd758d0 100644 --- a/contrib/python/httpcore/httpcore/_models.py +++ b/contrib/python/httpcore/httpcore/_models.py @@ -353,6 +353,14 @@ class Request: ) self.extensions = {} if extensions is None else extensions + if "target" in self.extensions: + self.url = URL( + scheme=self.url.scheme, + host=self.url.host, + port=self.url.port, + target=self.extensions["target"], + ) + def __repr__(self) -> str: return f"<{self.__class__.__name__} [{self.method!r}]>" diff --git a/contrib/python/httpcore/httpcore/_sync/http11.py b/contrib/python/httpcore/httpcore/_sync/http11.py index e108f88b12..a74ff8e809 100644 --- a/contrib/python/httpcore/httpcore/_sync/http11.py +++ b/contrib/python/httpcore/httpcore/_sync/http11.py @@ -1,8 +1,10 @@ import enum import logging +import ssl import time from types import TracebackType from typing import ( + Any, Iterable, Iterator, List, @@ -107,6 +109,7 @@ class HTTP11Connection(ConnectionInterface): status, reason_phrase, headers, + trailing_data, ) = self._receive_response_headers(**kwargs) trace.return_value = ( http_version, @@ -115,6 +118,14 @@ class HTTP11Connection(ConnectionInterface): headers, ) + network_stream = self._network_stream + + # CONNECT or Upgrade request + if (status == 101) or ( + (request.method == b"CONNECT") and (200 <= status < 300) + ): + network_stream = HTTP11UpgradeStream(network_stream, trailing_data) + return Response( status=status, headers=headers, @@ -122,7 +133,7 @@ class HTTP11Connection(ConnectionInterface): extensions={ "http_version": http_version, "reason_phrase": reason_phrase, - "network_stream": self._network_stream, + "network_stream": network_stream, }, ) except BaseException as exc: @@ -167,7 +178,7 @@ class HTTP11Connection(ConnectionInterface): def _receive_response_headers( self, request: Request - ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]: + ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], bytes]: timeouts = request.extensions.get("timeout", {}) timeout = timeouts.get("read", None) @@ -187,7 +198,9 @@ class HTTP11Connection(ConnectionInterface): # raw header casing, rather than the enforced lowercase headers. headers = event.headers.raw_items() - return http_version, event.status_code, event.reason, headers + trailing_data, _ = self._h11_state.trailing_data + + return http_version, event.status_code, event.reason, headers, trailing_data def _receive_response_body(self, request: Request) -> Iterator[bytes]: timeouts = request.extensions.get("timeout", {}) @@ -340,3 +353,34 @@ class HTTP11ConnectionByteStream: self._closed = True with Trace("response_closed", logger, self._request): self._connection._response_closed() + + +class HTTP11UpgradeStream(NetworkStream): + def __init__(self, stream: NetworkStream, leading_data: bytes) -> None: + self._stream = stream + self._leading_data = leading_data + + def read(self, max_bytes: int, timeout: Optional[float] = None) -> bytes: + if self._leading_data: + buffer = self._leading_data[:max_bytes] + self._leading_data = self._leading_data[max_bytes:] + return buffer + else: + return self._stream.read(max_bytes, timeout) + + def write(self, buffer: bytes, timeout: Optional[float] = None) -> None: + self._stream.write(buffer, timeout) + + def close(self) -> None: + self._stream.close() + + def start_tls( + self, + ssl_context: ssl.SSLContext, + server_hostname: Optional[str] = None, + timeout: Optional[float] = None, + ) -> NetworkStream: + return self._stream.start_tls(ssl_context, server_hostname, timeout) + + def get_extra_info(self, info: str) -> Any: + return self._stream.get_extra_info(info) diff --git a/contrib/python/httpcore/ya.make b/contrib/python/httpcore/ya.make index e8408ed893..66ea31672f 100644 --- a/contrib/python/httpcore/ya.make +++ b/contrib/python/httpcore/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(1.0.3) +VERSION(1.0.4) LICENSE(BSD-3-Clause) diff --git a/contrib/python/httpx/.dist-info/METADATA b/contrib/python/httpx/.dist-info/METADATA index ab2b707fcd..b5ec37c7d9 100644 --- a/contrib/python/httpx/.dist-info/METADATA +++ b/contrib/python/httpx/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: httpx -Version: 0.26.0 +Version: 0.27.0 Summary: The next generation HTTP client. Project-URL: Changelog, https://github.com/encode/httpx/blob/master/CHANGELOG.md Project-URL: Documentation, https://www.python-httpx.org @@ -194,21 +194,14 @@ inspiration around the lower-level networking details. ## Release Information -### Added - -* The `proxy` argument was added. You should use the `proxy` argument instead of the deprecated `proxies`, or use `mounts=` for more complex configurations. (#2879) - ### Deprecated -* The `proxies` argument is now deprecated. It will still continue to work, but it will be removed in the future. (#2879) +* The `app=...` shortcut has been deprecated. Use the explicit style of `transport=httpx.WSGITransport()` or `transport=httpx.ASGITransport()` instead. ### Fixed -* Fix cases of double escaping of URL path components. Allow / as a safe character in the query portion. (#2990) -* Handle `NO_PROXY` envvar cases when a fully qualified URL is supplied as the value. (#2741) -* Allow URLs where username or password contains unescaped '@'. (#2986) -* Ensure ASGI `raw_path` does not include URL query component. (#2999) -* Ensure `Response.iter_text()` cannot yield empty strings. (#2998) +* Respect the `http1` argument while configuring proxy transports. (#3023) +* Fix RFC 2069 mode digest authentication. (#3045) --- diff --git a/contrib/python/httpx/httpx/__version__.py b/contrib/python/httpx/httpx/__version__.py index 3edc842c69..c121a898de 100644 --- a/contrib/python/httpx/httpx/__version__.py +++ b/contrib/python/httpx/httpx/__version__.py @@ -1,3 +1,3 @@ __title__ = "httpx" __description__ = "A next generation HTTP client, for Python 3." -__version__ = "0.26.0" +__version__ = "0.27.0" diff --git a/contrib/python/httpx/httpx/_api.py b/contrib/python/httpx/httpx/_api.py index c7af947218..b5821cc49e 100644 --- a/contrib/python/httpx/httpx/_api.py +++ b/contrib/python/httpx/httpx/_api.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import typing from contextlib import contextmanager @@ -25,20 +27,20 @@ def request( method: str, url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Optional[AuthTypes] = None, - proxy: typing.Optional[ProxyTypes] = None, - proxies: typing.Optional[ProxiesTypes] = None, + params: QueryParamTypes | None = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | None = None, + proxy: ProxyTypes | None = None, + proxies: ProxiesTypes | None = None, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, follow_redirects: bool = False, verify: VerifyTypes = True, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, trust_env: bool = True, ) -> Response: """ @@ -120,20 +122,20 @@ def stream( method: str, url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Optional[AuthTypes] = None, - proxy: typing.Optional[ProxyTypes] = None, - proxies: typing.Optional[ProxiesTypes] = None, + params: QueryParamTypes | None = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | None = None, + proxy: ProxyTypes | None = None, + proxies: ProxiesTypes | None = None, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, follow_redirects: bool = False, verify: VerifyTypes = True, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, trust_env: bool = True, ) -> typing.Iterator[Response]: """ @@ -173,14 +175,14 @@ def stream( def get( url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Optional[AuthTypes] = None, - proxy: typing.Optional[ProxyTypes] = None, - proxies: typing.Optional[ProxiesTypes] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | None = None, + proxy: ProxyTypes | None = None, + proxies: ProxiesTypes | None = None, follow_redirects: bool = False, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, verify: VerifyTypes = True, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, trust_env: bool = True, @@ -213,14 +215,14 @@ def get( def options( url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Optional[AuthTypes] = None, - proxy: typing.Optional[ProxyTypes] = None, - proxies: typing.Optional[ProxiesTypes] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | None = None, + proxy: ProxyTypes | None = None, + proxies: ProxiesTypes | None = None, follow_redirects: bool = False, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, verify: VerifyTypes = True, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, trust_env: bool = True, @@ -253,14 +255,14 @@ def options( def head( url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Optional[AuthTypes] = None, - proxy: typing.Optional[ProxyTypes] = None, - proxies: typing.Optional[ProxiesTypes] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | None = None, + proxy: ProxyTypes | None = None, + proxies: ProxiesTypes | None = None, follow_redirects: bool = False, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, verify: VerifyTypes = True, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, trust_env: bool = True, @@ -293,18 +295,18 @@ def head( def post( url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Optional[AuthTypes] = None, - proxy: typing.Optional[ProxyTypes] = None, - proxies: typing.Optional[ProxiesTypes] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | None = None, + proxy: ProxyTypes | None = None, + proxies: ProxiesTypes | None = None, follow_redirects: bool = False, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, verify: VerifyTypes = True, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, trust_env: bool = True, @@ -338,18 +340,18 @@ def post( def put( url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Optional[AuthTypes] = None, - proxy: typing.Optional[ProxyTypes] = None, - proxies: typing.Optional[ProxiesTypes] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | None = None, + proxy: ProxyTypes | None = None, + proxies: ProxiesTypes | None = None, follow_redirects: bool = False, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, verify: VerifyTypes = True, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, trust_env: bool = True, @@ -383,18 +385,18 @@ def put( def patch( url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Optional[AuthTypes] = None, - proxy: typing.Optional[ProxyTypes] = None, - proxies: typing.Optional[ProxiesTypes] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | None = None, + proxy: ProxyTypes | None = None, + proxies: ProxiesTypes | None = None, follow_redirects: bool = False, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, verify: VerifyTypes = True, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, trust_env: bool = True, @@ -428,14 +430,14 @@ def patch( def delete( url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Optional[AuthTypes] = None, - proxy: typing.Optional[ProxyTypes] = None, - proxies: typing.Optional[ProxiesTypes] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | None = None, + proxy: ProxyTypes | None = None, + proxies: ProxiesTypes | None = None, follow_redirects: bool = False, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, verify: VerifyTypes = True, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, trust_env: bool = True, diff --git a/contrib/python/httpx/httpx/_auth.py b/contrib/python/httpx/httpx/_auth.py index 66132500ff..903e399617 100644 --- a/contrib/python/httpx/httpx/_auth.py +++ b/contrib/python/httpx/httpx/_auth.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import hashlib import os import re @@ -124,18 +126,14 @@ class BasicAuth(Auth): and uses HTTP Basic authentication. """ - def __init__( - self, username: typing.Union[str, bytes], password: typing.Union[str, bytes] - ) -> None: + def __init__(self, username: str | bytes, password: str | bytes) -> None: self._auth_header = self._build_auth_header(username, password) def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: request.headers["Authorization"] = self._auth_header yield request - def _build_auth_header( - self, username: typing.Union[str, bytes], password: typing.Union[str, bytes] - ) -> str: + def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str: userpass = b":".join((to_bytes(username), to_bytes(password))) token = b64encode(userpass).decode() return f"Basic {token}" @@ -146,7 +144,7 @@ class NetRCAuth(Auth): Use a 'netrc' file to lookup basic auth credentials based on the url host. """ - def __init__(self, file: typing.Optional[str] = None) -> None: + def __init__(self, file: str | None = None) -> None: # Lazily import 'netrc'. # There's no need for us to load this module unless 'NetRCAuth' is being used. import netrc @@ -165,16 +163,14 @@ class NetRCAuth(Auth): ) yield request - def _build_auth_header( - self, username: typing.Union[str, bytes], password: typing.Union[str, bytes] - ) -> str: + def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str: userpass = b":".join((to_bytes(username), to_bytes(password))) token = b64encode(userpass).decode() return f"Basic {token}" class DigestAuth(Auth): - _ALGORITHM_TO_HASH_FUNCTION: typing.Dict[str, typing.Callable[[bytes], "_Hash"]] = { + _ALGORITHM_TO_HASH_FUNCTION: dict[str, typing.Callable[[bytes], _Hash]] = { "MD5": hashlib.md5, "MD5-SESS": hashlib.md5, "SHA": hashlib.sha1, @@ -185,12 +181,10 @@ class DigestAuth(Auth): "SHA-512-SESS": hashlib.sha512, } - def __init__( - self, username: typing.Union[str, bytes], password: typing.Union[str, bytes] - ) -> None: + def __init__(self, username: str | bytes, password: str | bytes) -> None: self._username = to_bytes(username) self._password = to_bytes(password) - self._last_challenge: typing.Optional[_DigestAuthChallenge] = None + self._last_challenge: _DigestAuthChallenge | None = None self._nonce_count = 1 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: @@ -226,7 +220,7 @@ class DigestAuth(Auth): def _parse_challenge( self, request: Request, response: Response, auth_header: str - ) -> "_DigestAuthChallenge": + ) -> _DigestAuthChallenge: """ Returns a challenge from a Digest WWW-Authenticate header. These take the form of: @@ -237,7 +231,7 @@ class DigestAuth(Auth): # This method should only ever have been called with a Digest auth header. assert scheme.lower() == "digest" - header_dict: typing.Dict[str, str] = {} + header_dict: dict[str, str] = {} for field in parse_http_list(fields): key, value = field.strip().split("=", 1) header_dict[key] = unquote(value) @@ -256,7 +250,7 @@ class DigestAuth(Auth): raise ProtocolError(message, request=request) from exc def _build_auth_header( - self, request: Request, challenge: "_DigestAuthChallenge" + self, request: Request, challenge: _DigestAuthChallenge ) -> str: hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()] @@ -280,17 +274,18 @@ class DigestAuth(Auth): qop = self._resolve_qop(challenge.qop, request=request) if qop is None: + # Following RFC 2069 digest_data = [HA1, challenge.nonce, HA2] else: - digest_data = [challenge.nonce, nc_value, cnonce, qop, HA2] - key_digest = b":".join(digest_data) + # Following RFC 2617/7616 + digest_data = [HA1, challenge.nonce, nc_value, cnonce, qop, HA2] format_args = { "username": self._username, "realm": challenge.realm, "nonce": challenge.nonce, "uri": path, - "response": digest(b":".join((HA1, key_digest))), + "response": digest(b":".join(digest_data)), "algorithm": challenge.algorithm.encode(), } if challenge.opaque: @@ -310,7 +305,7 @@ class DigestAuth(Auth): return hashlib.sha1(s).hexdigest()[:16].encode() - def _get_header_value(self, header_fields: typing.Dict[str, bytes]) -> str: + def _get_header_value(self, header_fields: dict[str, bytes]) -> str: NON_QUOTED_FIELDS = ("algorithm", "qop", "nc") QUOTED_TEMPLATE = '{}="{}"' NON_QUOTED_TEMPLATE = "{}={}" @@ -328,9 +323,7 @@ class DigestAuth(Auth): return header_value - def _resolve_qop( - self, qop: typing.Optional[bytes], request: Request - ) -> typing.Optional[bytes]: + def _resolve_qop(self, qop: bytes | None, request: Request) -> bytes | None: if qop is None: return None qops = re.split(b", ?", qop) @@ -348,5 +341,5 @@ class _DigestAuthChallenge(typing.NamedTuple): realm: bytes nonce: bytes algorithm: str - opaque: typing.Optional[bytes] - qop: typing.Optional[bytes] + opaque: bytes | None + qop: bytes | None diff --git a/contrib/python/httpx/httpx/_client.py b/contrib/python/httpx/httpx/_client.py index 2813a84f01..e2c6702e0c 100644 --- a/contrib/python/httpx/httpx/_client.py +++ b/contrib/python/httpx/httpx/_client.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import datetime import enum import logging @@ -160,19 +162,17 @@ class BaseClient: def __init__( self, *, - auth: typing.Optional[AuthTypes] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, + auth: AuthTypes | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, follow_redirects: bool = False, max_redirects: int = DEFAULT_MAX_REDIRECTS, - event_hooks: typing.Optional[ - typing.Mapping[str, typing.List[EventHook]] - ] = None, + event_hooks: None | (typing.Mapping[str, list[EventHook]]) = None, base_url: URLTypes = "", trust_env: bool = True, - default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8", + default_encoding: str | typing.Callable[[bytes], str] = "utf-8", ) -> None: event_hooks = {} if event_hooks is None else event_hooks @@ -210,8 +210,8 @@ class BaseClient: return url.copy_with(raw_path=url.raw_path + b"/") def _get_proxy_map( - self, proxies: typing.Optional[ProxiesTypes], allow_env_proxies: bool - ) -> typing.Dict[str, typing.Optional[Proxy]]: + self, proxies: ProxiesTypes | None, allow_env_proxies: bool + ) -> dict[str, Proxy | None]: if proxies is None: if allow_env_proxies: return { @@ -238,20 +238,18 @@ class BaseClient: self._timeout = Timeout(timeout) @property - def event_hooks(self) -> typing.Dict[str, typing.List[EventHook]]: + def event_hooks(self) -> dict[str, list[EventHook]]: return self._event_hooks @event_hooks.setter - def event_hooks( - self, event_hooks: typing.Dict[str, typing.List[EventHook]] - ) -> None: + def event_hooks(self, event_hooks: dict[str, list[EventHook]]) -> None: self._event_hooks = { "request": list(event_hooks.get("request", [])), "response": list(event_hooks.get("response", [])), } @property - def auth(self) -> typing.Optional[Auth]: + def auth(self) -> Auth | None: """ Authentication class used when none is passed at the request-level. @@ -323,15 +321,15 @@ class BaseClient: method: str, url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Request: """ Build and return a request instance. @@ -391,9 +389,7 @@ class BaseClient: return self.base_url.copy_with(raw_path=merge_raw_path) return merge_url - def _merge_cookies( - self, cookies: typing.Optional[CookieTypes] = None - ) -> typing.Optional[CookieTypes]: + def _merge_cookies(self, cookies: CookieTypes | None = None) -> CookieTypes | None: """ Merge a cookies argument together with any cookies on the client, to create the cookies used for the outgoing request. @@ -404,9 +400,7 @@ class BaseClient: return merged_cookies return cookies - def _merge_headers( - self, headers: typing.Optional[HeaderTypes] = None - ) -> typing.Optional[HeaderTypes]: + def _merge_headers(self, headers: HeaderTypes | None = None) -> HeaderTypes | None: """ Merge a headers argument together with any headers on the client, to create the headers used for the outgoing request. @@ -416,8 +410,8 @@ class BaseClient: return merged_headers def _merge_queryparams( - self, params: typing.Optional[QueryParamTypes] = None - ) -> typing.Optional[QueryParamTypes]: + self, params: QueryParamTypes | None = None + ) -> QueryParamTypes | None: """ Merge a queryparams argument together with any queryparams on the client, to create the queryparams used for the outgoing request. @@ -427,7 +421,7 @@ class BaseClient: return merged_queryparams.merge(params) return params - def _build_auth(self, auth: typing.Optional[AuthTypes]) -> typing.Optional[Auth]: + def _build_auth(self, auth: AuthTypes | None) -> Auth | None: if auth is None: return None elif isinstance(auth, tuple): @@ -442,7 +436,7 @@ class BaseClient: def _build_request_auth( self, request: Request, - auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT, + auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, ) -> Auth: auth = ( self._auth if isinstance(auth, UseClientDefault) else self._build_auth(auth) @@ -557,7 +551,7 @@ class BaseClient: def _redirect_stream( self, request: Request, method: str - ) -> typing.Optional[typing.Union[SyncByteStream, AsyncByteStream]]: + ) -> SyncByteStream | AsyncByteStream | None: """ Return the body that should be used for the redirect request. """ @@ -598,6 +592,8 @@ class Client(BaseClient): to authenticate the client. Either a path to an SSL certificate file, or two-tuple of (certificate file, key file), or a three-tuple of (certificate file, key file, password). + * **http2** - *(optional)* A boolean indicating if HTTP/2 support should be + enabled. Defaults to `False`. * **proxy** - *(optional)* A proxy URL where all the traffic should be routed. * **proxies** - *(optional)* A dictionary mapping proxy keys to proxy URLs. @@ -622,31 +618,27 @@ class Client(BaseClient): def __init__( self, *, - auth: typing.Optional[AuthTypes] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, + auth: AuthTypes | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, verify: VerifyTypes = True, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, http1: bool = True, http2: bool = False, - proxy: typing.Optional[ProxyTypes] = None, - proxies: typing.Optional[ProxiesTypes] = None, - mounts: typing.Optional[ - typing.Mapping[str, typing.Optional[BaseTransport]] - ] = None, + proxy: ProxyTypes | None = None, + proxies: ProxiesTypes | None = None, + mounts: None | (typing.Mapping[str, BaseTransport | None]) = None, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, follow_redirects: bool = False, limits: Limits = DEFAULT_LIMITS, max_redirects: int = DEFAULT_MAX_REDIRECTS, - event_hooks: typing.Optional[ - typing.Mapping[str, typing.List[EventHook]] - ] = None, + event_hooks: None | (typing.Mapping[str, list[EventHook]]) = None, base_url: URLTypes = "", - transport: typing.Optional[BaseTransport] = None, - app: typing.Optional[typing.Callable[..., typing.Any]] = None, + transport: BaseTransport | None = None, + app: typing.Callable[..., typing.Any] | None = None, trust_env: bool = True, - default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8", + default_encoding: str | typing.Callable[[bytes], str] = "utf-8", ) -> None: super().__init__( auth=auth, @@ -680,6 +672,13 @@ class Client(BaseClient): if proxy: raise RuntimeError("Use either `proxy` or 'proxies', not both.") + if app: + message = ( + "The 'app' shortcut is now deprecated." + " Use the explicit style 'transport=WSGITransport(app=...)' instead." + ) + warnings.warn(message, DeprecationWarning) + allow_env_proxies = trust_env and app is None and transport is None proxy_map = self._get_proxy_map(proxies or proxy, allow_env_proxies) @@ -693,7 +692,7 @@ class Client(BaseClient): app=app, trust_env=trust_env, ) - self._mounts: typing.Dict[URLPattern, typing.Optional[BaseTransport]] = { + self._mounts: dict[URLPattern, BaseTransport | None] = { URLPattern(key): None if proxy is None else self._init_proxy_transport( @@ -717,12 +716,12 @@ class Client(BaseClient): def _init_transport( self, verify: VerifyTypes = True, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, - transport: typing.Optional[BaseTransport] = None, - app: typing.Optional[typing.Callable[..., typing.Any]] = None, + transport: BaseTransport | None = None, + app: typing.Callable[..., typing.Any] | None = None, trust_env: bool = True, ) -> BaseTransport: if transport is not None: @@ -744,7 +743,7 @@ class Client(BaseClient): self, proxy: Proxy, verify: VerifyTypes = True, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, @@ -776,17 +775,17 @@ class Client(BaseClient): method: str, url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Build and send a request. @@ -833,17 +832,17 @@ class Client(BaseClient): method: str, url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> typing.Iterator[Response]: """ Alternative to `httpx.request()` that streams the response body @@ -884,8 +883,8 @@ class Client(BaseClient): request: Request, *, stream: bool = False, - auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, + auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, ) -> Response: """ Send a request. @@ -933,7 +932,7 @@ class Client(BaseClient): request: Request, auth: Auth, follow_redirects: bool, - history: typing.List[Response], + history: list[Response], ) -> Response: auth_flow = auth.sync_auth_flow(request) try: @@ -966,7 +965,7 @@ class Client(BaseClient): self, request: Request, follow_redirects: bool, - history: typing.List[Response], + history: list[Response], ) -> Response: while True: if len(history) > self.max_redirects: @@ -1039,13 +1038,13 @@ class Client(BaseClient): self, url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send a `GET` request. @@ -1068,13 +1067,13 @@ class Client(BaseClient): self, url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send an `OPTIONS` request. @@ -1097,13 +1096,13 @@ class Client(BaseClient): self, url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send a `HEAD` request. @@ -1126,17 +1125,17 @@ class Client(BaseClient): self, url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send a `POST` request. @@ -1163,17 +1162,17 @@ class Client(BaseClient): self, url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send a `PUT` request. @@ -1200,17 +1199,17 @@ class Client(BaseClient): self, url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send a `PATCH` request. @@ -1237,13 +1236,13 @@ class Client(BaseClient): self, url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send a `DELETE` request. @@ -1294,9 +1293,9 @@ class Client(BaseClient): def __exit__( self, - exc_type: typing.Optional[typing.Type[BaseException]] = None, - exc_value: typing.Optional[BaseException] = None, - traceback: typing.Optional[TracebackType] = None, + exc_type: type[BaseException] | None = None, + exc_value: BaseException | None = None, + traceback: TracebackType | None = None, ) -> None: self._state = ClientState.CLOSED @@ -1311,6 +1310,8 @@ class AsyncClient(BaseClient): An asynchronous HTTP client, with connection pooling, HTTP/2, redirects, cookie persistence, etc. + It can be shared between tasks. + Usage: ```python @@ -1362,31 +1363,28 @@ class AsyncClient(BaseClient): def __init__( self, *, - auth: typing.Optional[AuthTypes] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, + auth: AuthTypes | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, verify: VerifyTypes = True, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, http1: bool = True, http2: bool = False, - proxy: typing.Optional[ProxyTypes] = None, - proxies: typing.Optional[ProxiesTypes] = None, - mounts: typing.Optional[ - typing.Mapping[str, typing.Optional[AsyncBaseTransport]] - ] = None, + proxy: ProxyTypes | None = None, + proxies: ProxiesTypes | None = None, + mounts: None | (typing.Mapping[str, AsyncBaseTransport | None]) = None, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, follow_redirects: bool = False, limits: Limits = DEFAULT_LIMITS, max_redirects: int = DEFAULT_MAX_REDIRECTS, - event_hooks: typing.Optional[ - typing.Mapping[str, typing.List[typing.Callable[..., typing.Any]]] - ] = None, + event_hooks: None + | (typing.Mapping[str, list[typing.Callable[..., typing.Any]]]) = None, base_url: URLTypes = "", - transport: typing.Optional[AsyncBaseTransport] = None, - app: typing.Optional[typing.Callable[..., typing.Any]] = None, + transport: AsyncBaseTransport | None = None, + app: typing.Callable[..., typing.Any] | None = None, trust_env: bool = True, - default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8", + default_encoding: str | typing.Callable[[bytes], str] = "utf-8", ) -> None: super().__init__( auth=auth, @@ -1420,7 +1418,14 @@ class AsyncClient(BaseClient): if proxy: raise RuntimeError("Use either `proxy` or 'proxies', not both.") - allow_env_proxies = trust_env and app is None and transport is None + if app: + message = ( + "The 'app' shortcut is now deprecated." + " Use the explicit style 'transport=ASGITransport(app=...)' instead." + ) + warnings.warn(message, DeprecationWarning) + + allow_env_proxies = trust_env and transport is None proxy_map = self._get_proxy_map(proxies or proxy, allow_env_proxies) self._transport = self._init_transport( @@ -1434,7 +1439,7 @@ class AsyncClient(BaseClient): trust_env=trust_env, ) - self._mounts: typing.Dict[URLPattern, typing.Optional[AsyncBaseTransport]] = { + self._mounts: dict[URLPattern, AsyncBaseTransport | None] = { URLPattern(key): None if proxy is None else self._init_proxy_transport( @@ -1457,12 +1462,12 @@ class AsyncClient(BaseClient): def _init_transport( self, verify: VerifyTypes = True, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, - transport: typing.Optional[AsyncBaseTransport] = None, - app: typing.Optional[typing.Callable[..., typing.Any]] = None, + transport: AsyncBaseTransport | None = None, + app: typing.Callable[..., typing.Any] | None = None, trust_env: bool = True, ) -> AsyncBaseTransport: if transport is not None: @@ -1484,7 +1489,7 @@ class AsyncClient(BaseClient): self, proxy: Proxy, verify: VerifyTypes = True, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, @@ -1493,6 +1498,7 @@ class AsyncClient(BaseClient): return AsyncHTTPTransport( verify=verify, cert=cert, + http1=http1, http2=http2, limits=limits, trust_env=trust_env, @@ -1515,17 +1521,17 @@ class AsyncClient(BaseClient): method: str, url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Build and send a request. @@ -1543,6 +1549,15 @@ class AsyncClient(BaseClient): [0]: /advanced/#merging-of-configuration """ + + if cookies is not None: # pragma: no cover + message = ( + "Setting per-request cookies=<...> is being deprecated, because " + "the expected behaviour on cookie persistence is ambiguous. Set " + "cookies directly on the client instance instead." + ) + warnings.warn(message, DeprecationWarning) + request = self.build_request( method=method, url=url, @@ -1564,17 +1579,17 @@ class AsyncClient(BaseClient): method: str, url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> typing.AsyncIterator[Response]: """ Alternative to `httpx.request()` that streams the response body @@ -1615,8 +1630,8 @@ class AsyncClient(BaseClient): request: Request, *, stream: bool = False, - auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, + auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, ) -> Response: """ Send a request. @@ -1655,7 +1670,7 @@ class AsyncClient(BaseClient): return response - except BaseException as exc: # pragma: no cover + except BaseException as exc: await response.aclose() raise exc @@ -1664,7 +1679,7 @@ class AsyncClient(BaseClient): request: Request, auth: Auth, follow_redirects: bool, - history: typing.List[Response], + history: list[Response], ) -> Response: auth_flow = auth.async_auth_flow(request) try: @@ -1697,7 +1712,7 @@ class AsyncClient(BaseClient): self, request: Request, follow_redirects: bool, - history: typing.List[Response], + history: list[Response], ) -> Response: while True: if len(history) > self.max_redirects: @@ -1770,13 +1785,13 @@ class AsyncClient(BaseClient): self, url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send a `GET` request. @@ -1799,13 +1814,13 @@ class AsyncClient(BaseClient): self, url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send an `OPTIONS` request. @@ -1828,13 +1843,13 @@ class AsyncClient(BaseClient): self, url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send a `HEAD` request. @@ -1857,17 +1872,17 @@ class AsyncClient(BaseClient): self, url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send a `POST` request. @@ -1894,17 +1909,17 @@ class AsyncClient(BaseClient): self, url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send a `PUT` request. @@ -1931,17 +1946,17 @@ class AsyncClient(BaseClient): self, url: URLTypes, *, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send a `PATCH` request. @@ -1968,13 +1983,13 @@ class AsyncClient(BaseClient): self, url: URLTypes, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT, - timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT, - extensions: typing.Optional[RequestExtensions] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, + follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, + timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, + extensions: RequestExtensions | None = None, ) -> Response: """ Send a `DELETE` request. @@ -2025,9 +2040,9 @@ class AsyncClient(BaseClient): async def __aexit__( self, - exc_type: typing.Optional[typing.Type[BaseException]] = None, - exc_value: typing.Optional[BaseException] = None, - traceback: typing.Optional[TracebackType] = None, + exc_type: type[BaseException] | None = None, + exc_value: BaseException | None = None, + traceback: TracebackType | None = None, ) -> None: self._state = ClientState.CLOSED diff --git a/contrib/python/httpx/httpx/_config.py b/contrib/python/httpx/httpx/_config.py index d4fcfb5c0d..6a3ae8022c 100644 --- a/contrib/python/httpx/httpx/_config.py +++ b/contrib/python/httpx/httpx/_config.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging import os import ssl @@ -43,7 +45,7 @@ UNSET = UnsetType() def create_ssl_context( - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, verify: VerifyTypes = True, trust_env: bool = True, http2: bool = False, @@ -67,7 +69,7 @@ class SSLConfig: def __init__( self, *, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, verify: VerifyTypes = True, trust_env: bool = True, http2: bool = False, @@ -192,7 +194,7 @@ class SSLConfig: ssl_context.load_cert_chain( certfile=self.cert[0], keyfile=self.cert[1], - password=self.cert[2], # type: ignore + password=self.cert[2], ) @@ -212,12 +214,12 @@ class Timeout: def __init__( self, - timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET, + timeout: TimeoutTypes | UnsetType = UNSET, *, - connect: typing.Union[None, float, UnsetType] = UNSET, - read: typing.Union[None, float, UnsetType] = UNSET, - write: typing.Union[None, float, UnsetType] = UNSET, - pool: typing.Union[None, float, UnsetType] = UNSET, + connect: None | float | UnsetType = UNSET, + read: None | float | UnsetType = UNSET, + write: None | float | UnsetType = UNSET, + pool: None | float | UnsetType = UNSET, ) -> None: if isinstance(timeout, Timeout): # Passed as a single explicit Timeout. @@ -256,7 +258,7 @@ class Timeout: self.write = timeout if isinstance(write, UnsetType) else write self.pool = timeout if isinstance(pool, UnsetType) else pool - def as_dict(self) -> typing.Dict[str, typing.Optional[float]]: + def as_dict(self) -> dict[str, float | None]: return { "connect": self.connect, "read": self.read, @@ -300,9 +302,9 @@ class Limits: def __init__( self, *, - max_connections: typing.Optional[int] = None, - max_keepalive_connections: typing.Optional[int] = None, - keepalive_expiry: typing.Optional[float] = 5.0, + max_connections: int | None = None, + max_keepalive_connections: int | None = None, + keepalive_expiry: float | None = 5.0, ) -> None: self.max_connections = max_connections self.max_keepalive_connections = max_keepalive_connections @@ -330,9 +332,9 @@ class Proxy: self, url: URLTypes, *, - ssl_context: typing.Optional[ssl.SSLContext] = None, - auth: typing.Optional[typing.Tuple[str, str]] = None, - headers: typing.Optional[HeaderTypes] = None, + ssl_context: ssl.SSLContext | None = None, + auth: tuple[str, str] | None = None, + headers: HeaderTypes | None = None, ) -> None: url = URL(url) headers = Headers(headers) @@ -351,7 +353,7 @@ class Proxy: self.ssl_context = ssl_context @property - def raw_auth(self) -> typing.Optional[typing.Tuple[bytes, bytes]]: + def raw_auth(self) -> tuple[bytes, bytes] | None: # The proxy authentication as raw bytes. return ( None diff --git a/contrib/python/httpx/httpx/_content.py b/contrib/python/httpx/httpx/_content.py index cd0d17f171..10b574bb3d 100644 --- a/contrib/python/httpx/httpx/_content.py +++ b/contrib/python/httpx/httpx/_content.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import inspect import warnings from json import dumps as json_dumps @@ -5,13 +7,9 @@ from typing import ( Any, AsyncIterable, AsyncIterator, - Dict, Iterable, Iterator, Mapping, - Optional, - Tuple, - Union, ) from urllib.parse import urlencode @@ -105,8 +103,8 @@ class UnattachedStream(AsyncByteStream, SyncByteStream): def encode_content( - content: Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]], -) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]: + content: str | bytes | Iterable[bytes] | AsyncIterable[bytes], +) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]: if isinstance(content, (bytes, str)): body = content.encode("utf-8") if isinstance(content, str) else content content_length = len(body) @@ -135,7 +133,7 @@ def encode_content( def encode_urlencoded_data( data: RequestData, -) -> Tuple[Dict[str, str], ByteStream]: +) -> tuple[dict[str, str], ByteStream]: plain_data = [] for key, value in data.items(): if isinstance(value, (list, tuple)): @@ -150,14 +148,14 @@ def encode_urlencoded_data( def encode_multipart_data( - data: RequestData, files: RequestFiles, boundary: Optional[bytes] -) -> Tuple[Dict[str, str], MultipartStream]: + data: RequestData, files: RequestFiles, boundary: bytes | None +) -> tuple[dict[str, str], MultipartStream]: multipart = MultipartStream(data=data, files=files, boundary=boundary) headers = multipart.get_headers() return headers, multipart -def encode_text(text: str) -> Tuple[Dict[str, str], ByteStream]: +def encode_text(text: str) -> tuple[dict[str, str], ByteStream]: body = text.encode("utf-8") content_length = str(len(body)) content_type = "text/plain; charset=utf-8" @@ -165,7 +163,7 @@ def encode_text(text: str) -> Tuple[Dict[str, str], ByteStream]: return headers, ByteStream(body) -def encode_html(html: str) -> Tuple[Dict[str, str], ByteStream]: +def encode_html(html: str) -> tuple[dict[str, str], ByteStream]: body = html.encode("utf-8") content_length = str(len(body)) content_type = "text/html; charset=utf-8" @@ -173,7 +171,7 @@ def encode_html(html: str) -> Tuple[Dict[str, str], ByteStream]: return headers, ByteStream(body) -def encode_json(json: Any) -> Tuple[Dict[str, str], ByteStream]: +def encode_json(json: Any) -> tuple[dict[str, str], ByteStream]: body = json_dumps(json).encode("utf-8") content_length = str(len(body)) content_type = "application/json" @@ -182,12 +180,12 @@ def encode_json(json: Any) -> Tuple[Dict[str, str], ByteStream]: def encode_request( - content: Optional[RequestContent] = None, - data: Optional[RequestData] = None, - files: Optional[RequestFiles] = None, - json: Optional[Any] = None, - boundary: Optional[bytes] = None, -) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]: + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: Any | None = None, + boundary: bytes | None = None, +) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]: """ Handles encoding the given `content`, `data`, `files`, and `json`, returning a two-tuple of (<headers>, <stream>). @@ -217,11 +215,11 @@ def encode_request( def encode_response( - content: Optional[ResponseContent] = None, - text: Optional[str] = None, - html: Optional[str] = None, - json: Optional[Any] = None, -) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]: + content: ResponseContent | None = None, + text: str | None = None, + html: str | None = None, + json: Any | None = None, +) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]: """ Handles encoding the given `content`, returning a two-tuple of (<headers>, <stream>). diff --git a/contrib/python/httpx/httpx/_decoders.py b/contrib/python/httpx/httpx/_decoders.py index 3f507c8e04..31c72c7f7a 100644 --- a/contrib/python/httpx/httpx/_decoders.py +++ b/contrib/python/httpx/httpx/_decoders.py @@ -3,6 +3,8 @@ Handlers for Content-Encoding. See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding """ +from __future__ import annotations + import codecs import io import typing @@ -167,11 +169,11 @@ class ByteChunker: Handles returning byte content in fixed-size chunks. """ - def __init__(self, chunk_size: typing.Optional[int] = None) -> None: + def __init__(self, chunk_size: int | None = None) -> None: self._buffer = io.BytesIO() self._chunk_size = chunk_size - def decode(self, content: bytes) -> typing.List[bytes]: + def decode(self, content: bytes) -> list[bytes]: if self._chunk_size is None: return [content] if content else [] @@ -194,7 +196,7 @@ class ByteChunker: else: return [] - def flush(self) -> typing.List[bytes]: + def flush(self) -> list[bytes]: value = self._buffer.getvalue() self._buffer.seek(0) self._buffer.truncate() @@ -206,11 +208,11 @@ class TextChunker: Handles returning text content in fixed-size chunks. """ - def __init__(self, chunk_size: typing.Optional[int] = None) -> None: + def __init__(self, chunk_size: int | None = None) -> None: self._buffer = io.StringIO() self._chunk_size = chunk_size - def decode(self, content: str) -> typing.List[str]: + def decode(self, content: str) -> list[str]: if self._chunk_size is None: return [content] if content else [] @@ -233,7 +235,7 @@ class TextChunker: else: return [] - def flush(self) -> typing.List[str]: + def flush(self) -> list[str]: value = self._buffer.getvalue() self._buffer.seek(0) self._buffer.truncate() @@ -264,10 +266,10 @@ class LineDecoder: """ def __init__(self) -> None: - self.buffer: typing.List[str] = [] + self.buffer: list[str] = [] self.trailing_cr: bool = False - def decode(self, text: str) -> typing.List[str]: + def decode(self, text: str) -> list[str]: # See https://docs.python.org/3/library/stdtypes.html#str.splitlines NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029" @@ -305,7 +307,7 @@ class LineDecoder: return lines - def flush(self) -> typing.List[str]: + def flush(self) -> list[str]: if not self.buffer and not self.trailing_cr: return [] diff --git a/contrib/python/httpx/httpx/_exceptions.py b/contrib/python/httpx/httpx/_exceptions.py index 123692955b..11424621c0 100644 --- a/contrib/python/httpx/httpx/_exceptions.py +++ b/contrib/python/httpx/httpx/_exceptions.py @@ -30,6 +30,8 @@ Our exception hierarchy: x ResponseNotRead x RequestNotRead """ +from __future__ import annotations + import contextlib import typing @@ -57,16 +59,16 @@ class HTTPError(Exception): def __init__(self, message: str) -> None: super().__init__(message) - self._request: typing.Optional["Request"] = None + self._request: Request | None = None @property - def request(self) -> "Request": + def request(self) -> Request: if self._request is None: raise RuntimeError("The .request property has not been set.") return self._request @request.setter - def request(self, request: "Request") -> None: + def request(self, request: Request) -> None: self._request = request @@ -75,9 +77,7 @@ class RequestError(HTTPError): Base class for all exceptions that may occur when issuing a `.request()`. """ - def __init__( - self, message: str, *, request: typing.Optional["Request"] = None - ) -> None: + def __init__(self, message: str, *, request: Request | None = None) -> None: super().__init__(message) # At the point an exception is raised we won't typically have a request # instance to associate it with. @@ -230,9 +230,7 @@ class HTTPStatusError(HTTPError): May be raised when calling `response.raise_for_status()` """ - def __init__( - self, message: str, *, request: "Request", response: "Response" - ) -> None: + def __init__(self, message: str, *, request: Request, response: Response) -> None: super().__init__(message) self.request = request self.response = response @@ -335,7 +333,7 @@ class RequestNotRead(StreamError): @contextlib.contextmanager def request_context( - request: typing.Optional["Request"] = None, + request: Request | None = None, ) -> typing.Iterator[None]: """ A context manager that can be used to attach the given request context diff --git a/contrib/python/httpx/httpx/_main.py b/contrib/python/httpx/httpx/_main.py index adb57d5fc0..72657f8ca3 100644 --- a/contrib/python/httpx/httpx/_main.py +++ b/contrib/python/httpx/httpx/_main.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import functools import json import sys @@ -125,8 +127,8 @@ def format_request_headers(request: httpcore.Request, http2: bool = False) -> st def format_response_headers( http_version: bytes, status: int, - reason_phrase: typing.Optional[bytes], - headers: typing.List[typing.Tuple[bytes, bytes]], + reason_phrase: bytes | None, + headers: list[tuple[bytes, bytes]], ) -> str: version = http_version.decode("ascii") reason = ( @@ -152,8 +154,8 @@ def print_request_headers(request: httpcore.Request, http2: bool = False) -> Non def print_response_headers( http_version: bytes, status: int, - reason_phrase: typing.Optional[bytes], - headers: typing.List[typing.Tuple[bytes, bytes]], + reason_phrase: bytes | None, + headers: list[tuple[bytes, bytes]], ) -> None: console = rich.console.Console() http_text = format_response_headers(http_version, status, reason_phrase, headers) @@ -268,7 +270,7 @@ def download_response(response: Response, download: typing.BinaryIO) -> None: def validate_json( ctx: click.Context, - param: typing.Union[click.Option, click.Parameter], + param: click.Option | click.Parameter, value: typing.Any, ) -> typing.Any: if value is None: @@ -282,7 +284,7 @@ def validate_json( def validate_auth( ctx: click.Context, - param: typing.Union[click.Option, click.Parameter], + param: click.Option | click.Parameter, value: typing.Any, ) -> typing.Any: if value == (None, None): @@ -296,7 +298,7 @@ def validate_auth( def handle_help( ctx: click.Context, - param: typing.Union[click.Option, click.Parameter], + param: click.Option | click.Parameter, value: typing.Any, ) -> None: if not value or ctx.resilient_parsing: @@ -448,20 +450,20 @@ def handle_help( def main( url: str, method: str, - params: typing.List[typing.Tuple[str, str]], + params: list[tuple[str, str]], content: str, - data: typing.List[typing.Tuple[str, str]], - files: typing.List[typing.Tuple[str, click.File]], + data: list[tuple[str, str]], + files: list[tuple[str, click.File]], json: str, - headers: typing.List[typing.Tuple[str, str]], - cookies: typing.List[typing.Tuple[str, str]], - auth: typing.Optional[typing.Tuple[str, str]], + headers: list[tuple[str, str]], + cookies: list[tuple[str, str]], + auth: tuple[str, str] | None, proxy: str, timeout: float, follow_redirects: bool, verify: bool, http2: bool, - download: typing.Optional[typing.BinaryIO], + download: typing.BinaryIO | None, verbose: bool, ) -> None: """ diff --git a/contrib/python/httpx/httpx/_models.py b/contrib/python/httpx/httpx/_models.py index b8617cdab5..cd76705f1a 100644 --- a/contrib/python/httpx/httpx/_models.py +++ b/contrib/python/httpx/httpx/_models.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import datetime import email.message import json as jsonlib @@ -59,8 +61,8 @@ class Headers(typing.MutableMapping[str, str]): def __init__( self, - headers: typing.Optional[HeaderTypes] = None, - encoding: typing.Optional[str] = None, + headers: HeaderTypes | None = None, + encoding: str | None = None, ) -> None: if headers is None: self._list = [] # type: typing.List[typing.Tuple[bytes, bytes, bytes]] @@ -117,7 +119,7 @@ class Headers(typing.MutableMapping[str, str]): self._encoding = value @property - def raw(self) -> typing.List[typing.Tuple[bytes, bytes]]: + def raw(self) -> list[tuple[bytes, bytes]]: """ Returns a list of the raw header items, as byte pairs. """ @@ -127,7 +129,7 @@ class Headers(typing.MutableMapping[str, str]): return {key.decode(self.encoding): None for _, key, value in self._list}.keys() def values(self) -> typing.ValuesView[str]: - values_dict: typing.Dict[str, str] = {} + values_dict: dict[str, str] = {} for _, key, value in self._list: str_key = key.decode(self.encoding) str_value = value.decode(self.encoding) @@ -142,7 +144,7 @@ class Headers(typing.MutableMapping[str, str]): Return `(key, value)` items of headers. Concatenate headers into a single comma separated value when a key occurs multiple times. """ - values_dict: typing.Dict[str, str] = {} + values_dict: dict[str, str] = {} for _, key, value in self._list: str_key = key.decode(self.encoding) str_value = value.decode(self.encoding) @@ -152,7 +154,7 @@ class Headers(typing.MutableMapping[str, str]): values_dict[str_key] = str_value return values_dict.items() - def multi_items(self) -> typing.List[typing.Tuple[str, str]]: + def multi_items(self) -> list[tuple[str, str]]: """ Return a list of `(key, value)` pairs of headers. Allow multiple occurrences of the same key without concatenating into a single @@ -173,7 +175,7 @@ class Headers(typing.MutableMapping[str, str]): except KeyError: return default - def get_list(self, key: str, split_commas: bool = False) -> typing.List[str]: + def get_list(self, key: str, split_commas: bool = False) -> list[str]: """ Return a list of all header values for a given key. If `split_commas=True` is passed, then any comma separated header @@ -195,14 +197,14 @@ class Headers(typing.MutableMapping[str, str]): split_values.extend([item.strip() for item in value.split(",")]) return split_values - def update(self, headers: typing.Optional[HeaderTypes] = None) -> None: # type: ignore + def update(self, headers: HeaderTypes | None = None) -> None: # type: ignore headers = Headers(headers) for key in headers.keys(): if key in self: self.pop(key) self._list.extend(headers._list) - def copy(self) -> "Headers": + def copy(self) -> Headers: return Headers(self, encoding=self.encoding) def __getitem__(self, key: str) -> str: @@ -306,18 +308,18 @@ class Headers(typing.MutableMapping[str, str]): class Request: def __init__( self, - method: typing.Union[str, bytes], - url: typing.Union["URL", str], + method: str | bytes, + url: URL | str, *, - params: typing.Optional[QueryParamTypes] = None, - headers: typing.Optional[HeaderTypes] = None, - cookies: typing.Optional[CookieTypes] = None, - content: typing.Optional[RequestContent] = None, - data: typing.Optional[RequestData] = None, - files: typing.Optional[RequestFiles] = None, - json: typing.Optional[typing.Any] = None, - stream: typing.Union[SyncByteStream, AsyncByteStream, None] = None, - extensions: typing.Optional[RequestExtensions] = None, + params: QueryParamTypes | None = None, + headers: HeaderTypes | None = None, + cookies: CookieTypes | None = None, + content: RequestContent | None = None, + data: RequestData | None = None, + files: RequestFiles | None = None, + json: typing.Any | None = None, + stream: SyncByteStream | AsyncByteStream | None = None, + extensions: RequestExtensions | None = None, ) -> None: self.method = ( method.decode("ascii").upper() @@ -334,7 +336,7 @@ class Request: Cookies(cookies).set_cookie_header(self) if stream is None: - content_type: typing.Optional[str] = self.headers.get("content-type") + content_type: str | None = self.headers.get("content-type") headers, stream = encode_request( content=content, data=data, @@ -368,14 +370,14 @@ class Request: # * Creating request instances on the *server-side* of the transport API. self.stream = stream - def _prepare(self, default_headers: typing.Dict[str, str]) -> None: + def _prepare(self, default_headers: dict[str, str]) -> None: for key, value in default_headers.items(): # Ignore Transfer-Encoding if the Content-Length has been set explicitly. if key.lower() == "transfer-encoding" and "Content-Length" in self.headers: continue self.headers.setdefault(key, value) - auto_headers: typing.List[typing.Tuple[bytes, bytes]] = [] + auto_headers: list[tuple[bytes, bytes]] = [] has_host = "Host" in self.headers has_content_length = ( @@ -428,14 +430,14 @@ class Request: url = str(self.url) return f"<{class_name}({self.method!r}, {url!r})>" - def __getstate__(self) -> typing.Dict[str, typing.Any]: + def __getstate__(self) -> dict[str, typing.Any]: return { name: value for name, value in self.__dict__.items() if name not in ["extensions", "stream"] } - def __setstate__(self, state: typing.Dict[str, typing.Any]) -> None: + def __setstate__(self, state: dict[str, typing.Any]) -> None: for name, value in state.items(): setattr(self, name, value) self.extensions = {} @@ -447,25 +449,25 @@ class Response: self, status_code: int, *, - headers: typing.Optional[HeaderTypes] = None, - content: typing.Optional[ResponseContent] = None, - text: typing.Optional[str] = None, - html: typing.Optional[str] = None, + headers: HeaderTypes | None = None, + content: ResponseContent | None = None, + text: str | None = None, + html: str | None = None, json: typing.Any = None, - stream: typing.Union[SyncByteStream, AsyncByteStream, None] = None, - request: typing.Optional[Request] = None, - extensions: typing.Optional[ResponseExtensions] = None, - history: typing.Optional[typing.List["Response"]] = None, - default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8", + stream: SyncByteStream | AsyncByteStream | None = None, + request: Request | None = None, + extensions: ResponseExtensions | None = None, + history: list[Response] | None = None, + default_encoding: str | typing.Callable[[bytes], str] = "utf-8", ) -> None: self.status_code = status_code self.headers = Headers(headers) - self._request: typing.Optional[Request] = request + self._request: Request | None = request # When follow_redirects=False and a redirect is received, # the client will set `response.next_request`. - self.next_request: typing.Optional[Request] = None + self.next_request: Request | None = None self.extensions: ResponseExtensions = {} if extensions is None else extensions self.history = [] if history is None else list(history) @@ -498,7 +500,7 @@ class Response: self._num_bytes_downloaded = 0 - def _prepare(self, default_headers: typing.Dict[str, str]) -> None: + def _prepare(self, default_headers: dict[str, str]) -> None: for key, value in default_headers.items(): # Ignore Transfer-Encoding if the Content-Length has been set explicitly. if key.lower() == "transfer-encoding" and "content-length" in self.headers: @@ -580,7 +582,7 @@ class Response: return self._text @property - def encoding(self) -> typing.Optional[str]: + def encoding(self) -> str | None: """ Return an encoding to use for decoding the byte content into text. The priority for determining this is given by... @@ -616,7 +618,7 @@ class Response: self._encoding = value @property - def charset_encoding(self) -> typing.Optional[str]: + def charset_encoding(self) -> str | None: """ Return the encoding, as specified by the Content-Type header. """ @@ -632,7 +634,7 @@ class Response: content, depending on the Content-Encoding used in the response. """ if not hasattr(self, "_decoder"): - decoders: typing.List[ContentDecoder] = [] + decoders: list[ContentDecoder] = [] values = self.headers.get_list("content-encoding", split_commas=True) for value in values: value = value.strip().lower() @@ -721,7 +723,7 @@ class Response: and "Location" in self.headers ) - def raise_for_status(self) -> "Response": + def raise_for_status(self) -> Response: """ Raise the `HTTPStatusError` if one occurred. """ @@ -762,25 +764,25 @@ class Response: return jsonlib.loads(self.content, **kwargs) @property - def cookies(self) -> "Cookies": + def cookies(self) -> Cookies: if not hasattr(self, "_cookies"): self._cookies = Cookies() self._cookies.extract_cookies(self) return self._cookies @property - def links(self) -> typing.Dict[typing.Optional[str], typing.Dict[str, str]]: + def links(self) -> dict[str | None, dict[str, str]]: """ Returns the parsed header links of the response, if any """ header = self.headers.get("link") - ldict = {} - if header: - links = parse_header_links(header) - for link in links: - key = link.get("rel") or link.get("url") - ldict[key] = link - return ldict + if header is None: + return {} + + return { + (link.get("rel") or link.get("url")): link + for link in parse_header_links(header) + } @property def num_bytes_downloaded(self) -> int: @@ -789,14 +791,14 @@ class Response: def __repr__(self) -> str: return f"<Response [{self.status_code} {self.reason_phrase}]>" - def __getstate__(self) -> typing.Dict[str, typing.Any]: + def __getstate__(self) -> dict[str, typing.Any]: return { name: value for name, value in self.__dict__.items() if name not in ["extensions", "stream", "is_closed", "_decoder"] } - def __setstate__(self, state: typing.Dict[str, typing.Any]) -> None: + def __setstate__(self, state: dict[str, typing.Any]) -> None: for name, value in state.items(): setattr(self, name, value) self.is_closed = True @@ -811,9 +813,7 @@ class Response: self._content = b"".join(self.iter_bytes()) return self._content - def iter_bytes( - self, chunk_size: typing.Optional[int] = None - ) -> typing.Iterator[bytes]: + def iter_bytes(self, chunk_size: int | None = None) -> typing.Iterator[bytes]: """ A byte-iterator over the decoded response content. This allows us to handle gzip, deflate, and brotli encoded responses. @@ -836,9 +836,7 @@ class Response: for chunk in chunker.flush(): yield chunk - def iter_text( - self, chunk_size: typing.Optional[int] = None - ) -> typing.Iterator[str]: + def iter_text(self, chunk_size: int | None = None) -> typing.Iterator[str]: """ A str-iterator over the decoded response content that handles both gzip, deflate, etc but also detects the content's @@ -866,9 +864,7 @@ class Response: for line in decoder.flush(): yield line - def iter_raw( - self, chunk_size: typing.Optional[int] = None - ) -> typing.Iterator[bytes]: + def iter_raw(self, chunk_size: int | None = None) -> typing.Iterator[bytes]: """ A byte-iterator over the raw response content. """ @@ -916,7 +912,7 @@ class Response: return self._content async def aiter_bytes( - self, chunk_size: typing.Optional[int] = None + self, chunk_size: int | None = None ) -> typing.AsyncIterator[bytes]: """ A byte-iterator over the decoded response content. @@ -941,7 +937,7 @@ class Response: yield chunk async def aiter_text( - self, chunk_size: typing.Optional[int] = None + self, chunk_size: int | None = None ) -> typing.AsyncIterator[str]: """ A str-iterator over the decoded response content @@ -971,7 +967,7 @@ class Response: yield line async def aiter_raw( - self, chunk_size: typing.Optional[int] = None + self, chunk_size: int | None = None ) -> typing.AsyncIterator[bytes]: """ A byte-iterator over the raw response content. @@ -1017,7 +1013,7 @@ class Cookies(typing.MutableMapping[str, str]): HTTP Cookies, as a mutable mapping. """ - def __init__(self, cookies: typing.Optional[CookieTypes] = None) -> None: + def __init__(self, cookies: CookieTypes | None = None) -> None: if cookies is None or isinstance(cookies, dict): self.jar = CookieJar() if isinstance(cookies, dict): @@ -1079,10 +1075,10 @@ class Cookies(typing.MutableMapping[str, str]): def get( # type: ignore self, name: str, - default: typing.Optional[str] = None, - domain: typing.Optional[str] = None, - path: typing.Optional[str] = None, - ) -> typing.Optional[str]: + default: str | None = None, + domain: str | None = None, + path: str | None = None, + ) -> str | None: """ Get a cookie by name. May optionally include domain and path in order to specify exactly which cookie to retrieve. @@ -1104,8 +1100,8 @@ class Cookies(typing.MutableMapping[str, str]): def delete( self, name: str, - domain: typing.Optional[str] = None, - path: typing.Optional[str] = None, + domain: str | None = None, + path: str | None = None, ) -> None: """ Delete a cookie by name. May optionally include domain and path @@ -1125,9 +1121,7 @@ class Cookies(typing.MutableMapping[str, str]): for cookie in remove: self.jar.clear(cookie.domain, cookie.path, cookie.name) - def clear( - self, domain: typing.Optional[str] = None, path: typing.Optional[str] = None - ) -> None: + def clear(self, domain: str | None = None, path: str | None = None) -> None: """ Delete all cookies. Optionally include a domain and path in order to only delete a subset of all the cookies. @@ -1140,7 +1134,7 @@ class Cookies(typing.MutableMapping[str, str]): args.append(path) self.jar.clear(*args) - def update(self, cookies: typing.Optional[CookieTypes] = None) -> None: # type: ignore + def update(self, cookies: CookieTypes | None = None) -> None: # type: ignore cookies = Cookies(cookies) for cookie in cookies.jar: self.jar.set_cookie(cookie) diff --git a/contrib/python/httpx/httpx/_multipart.py b/contrib/python/httpx/httpx/_multipart.py index 5122d5114f..8edb622778 100644 --- a/contrib/python/httpx/httpx/_multipart.py +++ b/contrib/python/httpx/httpx/_multipart.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import io import os import typing @@ -21,8 +23,8 @@ from ._utils import ( def get_multipart_boundary_from_content_type( - content_type: typing.Optional[bytes], -) -> typing.Optional[bytes]: + content_type: bytes | None, +) -> bytes | None: if not content_type or not content_type.startswith(b"multipart/form-data"): return None # parse boundary according to @@ -39,9 +41,7 @@ class DataField: A single form field item, within a multipart form field. """ - def __init__( - self, name: str, value: typing.Union[str, bytes, int, float, None] - ) -> None: + def __init__(self, name: str, value: str | bytes | int | float | None) -> None: if not isinstance(name, str): raise TypeError( f"Invalid type for name. Expected str, got {type(name)}: {name!r}" @@ -52,7 +52,7 @@ class DataField: f" got {type(value)}: {value!r}" ) self.name = name - self.value: typing.Union[str, bytes] = ( + self.value: str | bytes = ( value if isinstance(value, bytes) else primitive_value_to_str(value) ) @@ -93,8 +93,8 @@ class FileField: fileobj: FileContent - headers: typing.Dict[str, str] = {} - content_type: typing.Optional[str] = None + headers: dict[str, str] = {} + content_type: str | None = None # This large tuple based API largely mirror's requests' API # It would be good to think of better APIs for this that we could @@ -104,9 +104,9 @@ class FileField: if len(value) == 2: # neither the 3rd parameter (content_type) nor the 4th (headers) # was included - filename, fileobj = value # type: ignore + filename, fileobj = value elif len(value) == 3: - filename, fileobj, content_type = value # type: ignore + filename, fileobj, content_type = value else: # all 4 parameters included filename, fileobj, content_type, headers = value # type: ignore @@ -137,7 +137,7 @@ class FileField: self.file = fileobj self.headers = headers - def get_length(self) -> typing.Optional[int]: + def get_length(self) -> int | None: headers = self.render_headers() if isinstance(self.file, (str, bytes)): @@ -199,7 +199,7 @@ class MultipartStream(SyncByteStream, AsyncByteStream): self, data: RequestData, files: RequestFiles, - boundary: typing.Optional[bytes] = None, + boundary: bytes | None = None, ) -> None: if boundary is None: boundary = os.urandom(16).hex().encode("ascii") @@ -212,7 +212,7 @@ class MultipartStream(SyncByteStream, AsyncByteStream): def _iter_fields( self, data: RequestData, files: RequestFiles - ) -> typing.Iterator[typing.Union[FileField, DataField]]: + ) -> typing.Iterator[FileField | DataField]: for name, value in data.items(): if isinstance(value, (tuple, list)): for item in value: @@ -231,7 +231,7 @@ class MultipartStream(SyncByteStream, AsyncByteStream): yield b"\r\n" yield b"--%s--\r\n" % self.boundary - def get_content_length(self) -> typing.Optional[int]: + def get_content_length(self) -> int | None: """ Return the length of the multipart encoded content, or `None` if any of the files have a length that cannot be determined upfront. @@ -253,7 +253,7 @@ class MultipartStream(SyncByteStream, AsyncByteStream): # Content stream interface. - def get_headers(self) -> typing.Dict[str, str]: + def get_headers(self) -> dict[str, str]: content_length = self.get_content_length() content_type = self.content_type if content_length is None: diff --git a/contrib/python/httpx/httpx/_status_codes.py b/contrib/python/httpx/httpx/_status_codes.py index 671c30e1b8..4cde4e6845 100644 --- a/contrib/python/httpx/httpx/_status_codes.py +++ b/contrib/python/httpx/httpx/_status_codes.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from enum import IntEnum @@ -21,7 +23,7 @@ class codes(IntEnum): * RFC 8470: Using Early Data in HTTP """ - def __new__(cls, value: int, phrase: str = "") -> "codes": + def __new__(cls, value: int, phrase: str = "") -> codes: obj = int.__new__(cls, value) obj._value_ = value diff --git a/contrib/python/httpx/httpx/_transports/asgi.py b/contrib/python/httpx/httpx/_transports/asgi.py index 08cd392f75..9543a12861 100644 --- a/contrib/python/httpx/httpx/_transports/asgi.py +++ b/contrib/python/httpx/httpx/_transports/asgi.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import typing import sniffio @@ -24,7 +26,7 @@ _ASGIApp = typing.Callable[ ] -def create_event() -> "Event": +def create_event() -> Event: if sniffio.current_async_library() == "trio": import trio @@ -36,7 +38,7 @@ def create_event() -> "Event": class ASGIResponseStream(AsyncByteStream): - def __init__(self, body: typing.List[bytes]) -> None: + def __init__(self, body: list[bytes]) -> None: self._body = body async def __aiter__(self) -> typing.AsyncIterator[bytes]: @@ -81,7 +83,7 @@ class ASGITransport(AsyncBaseTransport): app: _ASGIApp, raise_app_exceptions: bool = True, root_path: str = "", - client: typing.Tuple[str, int] = ("127.0.0.1", 123), + client: tuple[str, int] = ("127.0.0.1", 123), ) -> None: self.app = app self.raise_app_exceptions = raise_app_exceptions @@ -123,7 +125,7 @@ class ASGITransport(AsyncBaseTransport): # ASGI callables. - async def receive() -> typing.Dict[str, typing.Any]: + async def receive() -> dict[str, typing.Any]: nonlocal request_complete if request_complete: @@ -137,7 +139,7 @@ class ASGITransport(AsyncBaseTransport): return {"type": "http.request", "body": b"", "more_body": False} return {"type": "http.request", "body": body, "more_body": True} - async def send(message: typing.Dict[str, typing.Any]) -> None: + async def send(message: dict[str, typing.Any]) -> None: nonlocal status_code, response_headers, response_started if message["type"] == "http.response.start": diff --git a/contrib/python/httpx/httpx/_transports/base.py b/contrib/python/httpx/httpx/_transports/base.py index f6fdfe6943..8b6dc3c239 100644 --- a/contrib/python/httpx/httpx/_transports/base.py +++ b/contrib/python/httpx/httpx/_transports/base.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import typing from types import TracebackType @@ -13,9 +15,9 @@ class BaseTransport: def __exit__( self, - exc_type: typing.Optional[typing.Type[BaseException]] = None, - exc_value: typing.Optional[BaseException] = None, - traceback: typing.Optional[TracebackType] = None, + exc_type: type[BaseException] | None = None, + exc_value: BaseException | None = None, + traceback: TracebackType | None = None, ) -> None: self.close() @@ -64,9 +66,9 @@ class AsyncBaseTransport: async def __aexit__( self, - exc_type: typing.Optional[typing.Type[BaseException]] = None, - exc_value: typing.Optional[BaseException] = None, - traceback: typing.Optional[TracebackType] = None, + exc_type: type[BaseException] | None = None, + exc_value: BaseException | None = None, + traceback: TracebackType | None = None, ) -> None: await self.aclose() diff --git a/contrib/python/httpx/httpx/_transports/default.py b/contrib/python/httpx/httpx/_transports/default.py index 14a087389a..14476a3ce3 100644 --- a/contrib/python/httpx/httpx/_transports/default.py +++ b/contrib/python/httpx/httpx/_transports/default.py @@ -23,6 +23,8 @@ client = httpx.Client(transport=transport) transport = httpx.HTTPTransport(uds="socket.uds") client = httpx.Client(transport=transport) """ +from __future__ import annotations + import contextlib import typing from types import TracebackType @@ -120,16 +122,16 @@ class HTTPTransport(BaseTransport): def __init__( self, verify: VerifyTypes = True, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, trust_env: bool = True, - proxy: typing.Optional[ProxyTypes] = None, - uds: typing.Optional[str] = None, - local_address: typing.Optional[str] = None, + proxy: ProxyTypes | None = None, + uds: str | None = None, + local_address: str | None = None, retries: int = 0, - socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None, + socket_options: typing.Iterable[SOCKET_OPTION] | None = None, ) -> None: ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy @@ -202,9 +204,9 @@ class HTTPTransport(BaseTransport): def __exit__( self, - exc_type: typing.Optional[typing.Type[BaseException]] = None, - exc_value: typing.Optional[BaseException] = None, - traceback: typing.Optional[TracebackType] = None, + exc_type: type[BaseException] | None = None, + exc_value: BaseException | None = None, + traceback: TracebackType | None = None, ) -> None: with map_httpcore_exceptions(): self._pool.__exit__(exc_type, exc_value, traceback) @@ -261,16 +263,16 @@ class AsyncHTTPTransport(AsyncBaseTransport): def __init__( self, verify: VerifyTypes = True, - cert: typing.Optional[CertTypes] = None, + cert: CertTypes | None = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, trust_env: bool = True, - proxy: typing.Optional[ProxyTypes] = None, - uds: typing.Optional[str] = None, - local_address: typing.Optional[str] = None, + proxy: ProxyTypes | None = None, + uds: str | None = None, + local_address: str | None = None, retries: int = 0, - socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None, + socket_options: typing.Iterable[SOCKET_OPTION] | None = None, ) -> None: ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy @@ -342,9 +344,9 @@ class AsyncHTTPTransport(AsyncBaseTransport): async def __aexit__( self, - exc_type: typing.Optional[typing.Type[BaseException]] = None, - exc_value: typing.Optional[BaseException] = None, - traceback: typing.Optional[TracebackType] = None, + exc_type: type[BaseException] | None = None, + exc_value: BaseException | None = None, + traceback: TracebackType | None = None, ) -> None: with map_httpcore_exceptions(): await self._pool.__aexit__(exc_type, exc_value, traceback) diff --git a/contrib/python/httpx/httpx/_transports/mock.py b/contrib/python/httpx/httpx/_transports/mock.py index 82043da2d9..5abea83731 100644 --- a/contrib/python/httpx/httpx/_transports/mock.py +++ b/contrib/python/httpx/httpx/_transports/mock.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import typing from .._models import Request, Response @@ -8,7 +10,7 @@ AsyncHandler = typing.Callable[[Request], typing.Coroutine[None, None, Response] class MockTransport(AsyncBaseTransport, BaseTransport): - def __init__(self, handler: typing.Union[SyncHandler, AsyncHandler]) -> None: + def __init__(self, handler: SyncHandler | AsyncHandler) -> None: self.handler = handler def handle_request( diff --git a/contrib/python/httpx/httpx/_transports/wsgi.py b/contrib/python/httpx/httpx/_transports/wsgi.py index a23d42c414..cd03a9417b 100644 --- a/contrib/python/httpx/httpx/_transports/wsgi.py +++ b/contrib/python/httpx/httpx/_transports/wsgi.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import io import itertools import sys @@ -71,11 +73,11 @@ class WSGITransport(BaseTransport): def __init__( self, - app: "WSGIApplication", + app: WSGIApplication, raise_app_exceptions: bool = True, script_name: str = "", remote_addr: str = "127.0.0.1", - wsgi_errors: typing.Optional[typing.TextIO] = None, + wsgi_errors: typing.TextIO | None = None, ) -> None: self.app = app self.raise_app_exceptions = raise_app_exceptions @@ -117,8 +119,8 @@ class WSGITransport(BaseTransport): def start_response( status: str, - response_headers: typing.List[typing.Tuple[str, str]], - exc_info: typing.Optional["OptExcInfo"] = None, + response_headers: list[tuple[str, str]], + exc_info: OptExcInfo | None = None, ) -> typing.Callable[[bytes], typing.Any]: nonlocal seen_status, seen_response_headers, seen_exc_info seen_status = status diff --git a/contrib/python/httpx/httpx/_urlparse.py b/contrib/python/httpx/httpx/_urlparse.py index 07bbea9070..6a4b55b38c 100644 --- a/contrib/python/httpx/httpx/_urlparse.py +++ b/contrib/python/httpx/httpx/_urlparse.py @@ -15,6 +15,8 @@ Previously we relied on the excellent `rfc3986` package to handle URL parsing an validation, but this module provides a simpler alternative, with less indirection required. """ +from __future__ import annotations + import ipaddress import re import typing @@ -95,10 +97,10 @@ class ParseResult(typing.NamedTuple): scheme: str userinfo: str host: str - port: typing.Optional[int] + port: int | None path: str - query: typing.Optional[str] - fragment: typing.Optional[str] + query: str | None + fragment: str | None @property def authority(self) -> str: @@ -119,7 +121,7 @@ class ParseResult(typing.NamedTuple): ] ) - def copy_with(self, **kwargs: typing.Optional[str]) -> "ParseResult": + def copy_with(self, **kwargs: str | None) -> ParseResult: if not kwargs: return self @@ -146,7 +148,7 @@ class ParseResult(typing.NamedTuple): ) -def urlparse(url: str = "", **kwargs: typing.Optional[str]) -> ParseResult: +def urlparse(url: str = "", **kwargs: str | None) -> ParseResult: # Initial basic checks on allowable URLs. # --------------------------------------- @@ -243,7 +245,7 @@ def urlparse(url: str = "", **kwargs: typing.Optional[str]) -> ParseResult: parsed_scheme: str = scheme.lower() parsed_userinfo: str = quote(userinfo, safe=SUB_DELIMS + ":") parsed_host: str = encode_host(host) - parsed_port: typing.Optional[int] = normalize_port(port, scheme) + parsed_port: int | None = normalize_port(port, scheme) has_scheme = parsed_scheme != "" has_authority = ( @@ -260,11 +262,11 @@ def urlparse(url: str = "", **kwargs: typing.Optional[str]) -> ParseResult: # For 'path' we need to drop ? and # from the GEN_DELIMS set. parsed_path: str = quote(path, safe=SUB_DELIMS + ":/[]@") # For 'query' we need to drop '#' from the GEN_DELIMS set. - parsed_query: typing.Optional[str] = ( + parsed_query: str | None = ( None if query is None else quote(query, safe=SUB_DELIMS + ":/?[]@") ) # For 'fragment' we can include all of the GEN_DELIMS set. - parsed_fragment: typing.Optional[str] = ( + parsed_fragment: str | None = ( None if fragment is None else quote(fragment, safe=SUB_DELIMS + ":/?#[]@") ) @@ -327,9 +329,7 @@ def encode_host(host: str) -> str: raise InvalidURL(f"Invalid IDNA hostname: {host!r}") -def normalize_port( - port: typing.Optional[typing.Union[str, int]], scheme: str -) -> typing.Optional[int]: +def normalize_port(port: str | int | None, scheme: str) -> int | None: # From https://tools.ietf.org/html/rfc3986#section-3.2.3 # # "A scheme may define a default port. For example, the "http" scheme @@ -393,7 +393,7 @@ def normalize_path(path: str) -> str: """ # https://datatracker.ietf.org/doc/html/rfc3986#section-5.2.4 components = path.split("/") - output: typing.List[str] = [] + output: list[str] = [] for component in components: if component == ".": pass @@ -479,7 +479,7 @@ def quote(string: str, safe: str = "/") -> str: return "".join(parts) -def urlencode(items: typing.List[typing.Tuple[str, str]]) -> str: +def urlencode(items: list[tuple[str, str]]) -> str: """ We can use a much simpler version of the stdlib urlencode here because we don't need to handle a bunch of different typing cases, such as bytes vs str. diff --git a/contrib/python/httpx/httpx/_urls.py b/contrib/python/httpx/httpx/_urls.py index 26202e95db..43dedd5644 100644 --- a/contrib/python/httpx/httpx/_urls.py +++ b/contrib/python/httpx/httpx/_urls.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import typing from urllib.parse import parse_qs, unquote @@ -70,9 +72,7 @@ class URL: themselves. """ - def __init__( - self, url: typing.Union["URL", str] = "", **kwargs: typing.Any - ) -> None: + def __init__(self, url: URL | str = "", **kwargs: typing.Any) -> None: if kwargs: allowed = { "scheme": str, @@ -213,7 +213,7 @@ class URL: return self._uri_reference.host.encode("ascii") @property - def port(self) -> typing.Optional[int]: + def port(self) -> int | None: """ The URL port as an integer. @@ -270,7 +270,7 @@ class URL: return query.encode("ascii") @property - def params(self) -> "QueryParams": + def params(self) -> QueryParams: """ The URL query parameters, neatly parsed and packaged into an immutable multidict representation. @@ -338,7 +338,7 @@ class URL: """ return not self.is_absolute_url - def copy_with(self, **kwargs: typing.Any) -> "URL": + def copy_with(self, **kwargs: typing.Any) -> URL: """ Copy this URL, returning a new URL with some components altered. Accepts the same set of parameters as the components that are made @@ -353,19 +353,19 @@ class URL: """ return URL(self, **kwargs) - def copy_set_param(self, key: str, value: typing.Any = None) -> "URL": + def copy_set_param(self, key: str, value: typing.Any = None) -> URL: return self.copy_with(params=self.params.set(key, value)) - def copy_add_param(self, key: str, value: typing.Any = None) -> "URL": + def copy_add_param(self, key: str, value: typing.Any = None) -> URL: return self.copy_with(params=self.params.add(key, value)) - def copy_remove_param(self, key: str) -> "URL": + def copy_remove_param(self, key: str) -> URL: return self.copy_with(params=self.params.remove(key)) - def copy_merge_params(self, params: QueryParamTypes) -> "URL": + def copy_merge_params(self, params: QueryParamTypes) -> URL: return self.copy_with(params=self.params.merge(params)) - def join(self, url: URLTypes) -> "URL": + def join(self, url: URLTypes) -> URL: """ Return an absolute URL, using this URL as the base. @@ -420,9 +420,7 @@ class QueryParams(typing.Mapping[str, str]): URL query parameters, as a multi-dict. """ - def __init__( - self, *args: typing.Optional[QueryParamTypes], **kwargs: typing.Any - ) -> None: + def __init__(self, *args: QueryParamTypes | None, **kwargs: typing.Any) -> None: assert len(args) < 2, "Too many arguments." assert not (args and kwargs), "Cannot mix named and unnamed arguments." @@ -434,7 +432,7 @@ class QueryParams(typing.Mapping[str, str]): elif isinstance(value, QueryParams): self._dict = {k: list(v) for k, v in value._dict.items()} else: - dict_value: typing.Dict[typing.Any, typing.List[typing.Any]] = {} + dict_value: dict[typing.Any, list[typing.Any]] = {} if isinstance(value, (list, tuple)): # Convert list inputs like: # [("a", "123"), ("a", "456"), ("b", "789")] @@ -495,7 +493,7 @@ class QueryParams(typing.Mapping[str, str]): """ return {k: v[0] for k, v in self._dict.items()}.items() - def multi_items(self) -> typing.List[typing.Tuple[str, str]]: + def multi_items(self) -> list[tuple[str, str]]: """ Return all items in the query params. Allow duplicate keys to occur. @@ -504,7 +502,7 @@ class QueryParams(typing.Mapping[str, str]): q = httpx.QueryParams("a=123&a=456&b=789") assert list(q.multi_items()) == [("a", "123"), ("a", "456"), ("b", "789")] """ - multi_items: typing.List[typing.Tuple[str, str]] = [] + multi_items: list[tuple[str, str]] = [] for k, v in self._dict.items(): multi_items.extend([(k, i) for i in v]) return multi_items @@ -523,7 +521,7 @@ class QueryParams(typing.Mapping[str, str]): return self._dict[str(key)][0] return default - def get_list(self, key: str) -> typing.List[str]: + def get_list(self, key: str) -> list[str]: """ Get all values from the query param for a given key. @@ -534,7 +532,7 @@ class QueryParams(typing.Mapping[str, str]): """ return list(self._dict.get(str(key), [])) - def set(self, key: str, value: typing.Any = None) -> "QueryParams": + def set(self, key: str, value: typing.Any = None) -> QueryParams: """ Return a new QueryParams instance, setting the value of a key. @@ -549,7 +547,7 @@ class QueryParams(typing.Mapping[str, str]): q._dict[str(key)] = [primitive_value_to_str(value)] return q - def add(self, key: str, value: typing.Any = None) -> "QueryParams": + def add(self, key: str, value: typing.Any = None) -> QueryParams: """ Return a new QueryParams instance, setting or appending the value of a key. @@ -564,7 +562,7 @@ class QueryParams(typing.Mapping[str, str]): q._dict[str(key)] = q.get_list(key) + [primitive_value_to_str(value)] return q - def remove(self, key: str) -> "QueryParams": + def remove(self, key: str) -> QueryParams: """ Return a new QueryParams instance, removing the value of a key. @@ -579,7 +577,7 @@ class QueryParams(typing.Mapping[str, str]): q._dict.pop(str(key), None) return q - def merge(self, params: typing.Optional[QueryParamTypes] = None) -> "QueryParams": + def merge(self, params: QueryParamTypes | None = None) -> QueryParams: """ Return a new QueryParams instance, updated with. @@ -635,7 +633,7 @@ class QueryParams(typing.Mapping[str, str]): query_string = str(self) return f"{class_name}({query_string!r})" - def update(self, params: typing.Optional[QueryParamTypes] = None) -> None: + def update(self, params: QueryParamTypes | None = None) -> None: raise RuntimeError( "QueryParams are immutable since 0.18.0. " "Use `q = q.merge(...)` to create an updated copy." diff --git a/contrib/python/httpx/httpx/_utils.py b/contrib/python/httpx/httpx/_utils.py index bc3cb001dd..a9ece19438 100644 --- a/contrib/python/httpx/httpx/_utils.py +++ b/contrib/python/httpx/httpx/_utils.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import codecs import email.message import ipaddress @@ -27,9 +29,9 @@ _HTML5_FORM_ENCODING_RE = re.compile( def normalize_header_key( - value: typing.Union[str, bytes], + value: str | bytes, lower: bool, - encoding: typing.Optional[str] = None, + encoding: str | None = None, ) -> bytes: """ Coerce str/bytes into a strictly byte-wise HTTP header key. @@ -42,9 +44,7 @@ def normalize_header_key( return bytes_value.lower() if lower else bytes_value -def normalize_header_value( - value: typing.Union[str, bytes], encoding: typing.Optional[str] = None -) -> bytes: +def normalize_header_value(value: str | bytes, encoding: str | None = None) -> bytes: """ Coerce str/bytes into a strictly byte-wise HTTP header value. """ @@ -53,7 +53,7 @@ def normalize_header_value( return value.encode(encoding or "ascii") -def primitive_value_to_str(value: "PrimitiveData") -> str: +def primitive_value_to_str(value: PrimitiveData) -> str: """ Coerce a primitive data type into a string value. @@ -91,7 +91,7 @@ def format_form_param(name: str, value: str) -> bytes: return f'{name}="{value}"'.encode() -def get_ca_bundle_from_env() -> typing.Optional[str]: +def get_ca_bundle_from_env() -> str | None: if "SSL_CERT_FILE" in os.environ: ssl_file = Path(os.environ["SSL_CERT_FILE"]) if ssl_file.is_file(): @@ -103,7 +103,7 @@ def get_ca_bundle_from_env() -> typing.Optional[str]: return None -def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]: +def parse_header_links(value: str) -> list[dict[str, str]]: """ Returns a list of parsed link headers, for more info see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link @@ -119,7 +119,7 @@ def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]: :param value: HTTP Link entity-header field :return: list of parsed link headers """ - links: typing.List[typing.Dict[str, str]] = [] + links: list[dict[str, str]] = [] replace_chars = " '\"" value = value.strip(replace_chars) if not value: @@ -140,7 +140,7 @@ def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]: return links -def parse_content_type_charset(content_type: str) -> typing.Optional[str]: +def parse_content_type_charset(content_type: str) -> str | None: # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery. # See: https://peps.python.org/pep-0594/#cgi msg = email.message.Message() @@ -152,21 +152,21 @@ SENSITIVE_HEADERS = {"authorization", "proxy-authorization"} def obfuscate_sensitive_headers( - items: typing.Iterable[typing.Tuple[typing.AnyStr, typing.AnyStr]], -) -> typing.Iterator[typing.Tuple[typing.AnyStr, typing.AnyStr]]: + items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]], +) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]: for k, v in items: if to_str(k.lower()) in SENSITIVE_HEADERS: v = to_bytes_or_str("[secure]", match_type_of=v) yield k, v -def port_or_default(url: "URL") -> typing.Optional[int]: +def port_or_default(url: URL) -> int | None: if url.port is not None: return url.port return {"http": 80, "https": 443}.get(url.scheme) -def same_origin(url: "URL", other: "URL") -> bool: +def same_origin(url: URL, other: URL) -> bool: """ Return 'True' if the given URLs share the same origin. """ @@ -177,7 +177,7 @@ def same_origin(url: "URL", other: "URL") -> bool: ) -def is_https_redirect(url: "URL", location: "URL") -> bool: +def is_https_redirect(url: URL, location: URL) -> bool: """ Return 'True' if 'location' is a HTTPS upgrade of 'url' """ @@ -192,7 +192,7 @@ def is_https_redirect(url: "URL", location: "URL") -> bool: ) -def get_environment_proxies() -> typing.Dict[str, typing.Optional[str]]: +def get_environment_proxies() -> dict[str, str | None]: """Gets proxy information from the environment""" # urllib.request.getproxies() falls back on System @@ -200,7 +200,7 @@ def get_environment_proxies() -> typing.Dict[str, typing.Optional[str]]: # We don't want to propagate non-HTTP proxies into # our configuration such as 'TRAVIS_APT_PROXY'. proxy_info = getproxies() - mounts: typing.Dict[str, typing.Optional[str]] = {} + mounts: dict[str, str | None] = {} for scheme in ("http", "https", "all"): if proxy_info.get(scheme): @@ -241,11 +241,11 @@ def get_environment_proxies() -> typing.Dict[str, typing.Optional[str]]: return mounts -def to_bytes(value: typing.Union[str, bytes], encoding: str = "utf-8") -> bytes: +def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes: return value.encode(encoding) if isinstance(value, str) else value -def to_str(value: typing.Union[str, bytes], encoding: str = "utf-8") -> str: +def to_str(value: str | bytes, encoding: str = "utf-8") -> str: return value if isinstance(value, str) else value.decode(encoding) @@ -257,13 +257,13 @@ def unquote(value: str) -> str: return value[1:-1] if value[0] == value[-1] == '"' else value -def guess_content_type(filename: typing.Optional[str]) -> typing.Optional[str]: +def guess_content_type(filename: str | None) -> str | None: if filename: return mimetypes.guess_type(filename)[0] or "application/octet-stream" return None -def peek_filelike_length(stream: typing.Any) -> typing.Optional[int]: +def peek_filelike_length(stream: typing.Any) -> int | None: """ Given a file-like stream object, return its length in number of bytes without reading it into memory. @@ -373,7 +373,7 @@ class URLPattern: self.host = "" if url.host == "*" else url.host self.port = url.port if not url.host or url.host == "*": - self.host_regex: typing.Optional[typing.Pattern[str]] = None + self.host_regex: typing.Pattern[str] | None = None elif url.host.startswith("*."): # *.example.com should match "www.example.com", but not "example.com" domain = re.escape(url.host[2:]) @@ -387,7 +387,7 @@ class URLPattern: domain = re.escape(url.host) self.host_regex = re.compile(f"^{domain}$") - def matches(self, other: "URL") -> bool: + def matches(self, other: URL) -> bool: if self.scheme and self.scheme != other.scheme: return False if ( @@ -401,7 +401,7 @@ class URLPattern: return True @property - def priority(self) -> typing.Tuple[int, int, int]: + def priority(self) -> tuple[int, int, int]: """ The priority allows URLPattern instances to be sortable, so that we can match from most specific to least specific. @@ -417,7 +417,7 @@ class URLPattern: def __hash__(self) -> int: return hash(self.pattern) - def __lt__(self, other: "URLPattern") -> bool: + def __lt__(self, other: URLPattern) -> bool: return self.priority < other.priority def __eq__(self, other: typing.Any) -> bool: diff --git a/contrib/python/httpx/ya.make b/contrib/python/httpx/ya.make index 2f8eaa9087..fe32d75035 100644 --- a/contrib/python/httpx/ya.make +++ b/contrib/python/httpx/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(0.26.0) +VERSION(0.27.0) LICENSE(BSD-3-Clause) diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 675a1d8825..71ddeb7b6f 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.8.0 +Version: 3.8.1 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index 484d68aafd..24ba22bc0f 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.8.0) +VERSION(3.8.1) LICENSE(Apache-2.0) diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py index 803309ff6d..98bcc5d2ec 100644 --- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py @@ -202,7 +202,7 @@ class GrpcWrapperAsyncIO(IGrpcWrapperAsyncIO): # todo handle grpc exceptions and convert it to internal exceptions try: grpc_message = await self.from_server_grpc.__anext__() - except grpc.RpcError as e: + except (grpc.RpcError, grpc.aio.AioRpcError) as e: raise connection._rpc_error_handler(self._connection_state, e) issues._process_response(grpc_message) diff --git a/contrib/python/ydb/py3/ydb/connection.py b/contrib/python/ydb/py3/ydb/connection.py index 1c4bd9c78d..8e65cd3b83 100644 --- a/contrib/python/ydb/py3/ydb/connection.py +++ b/contrib/python/ydb/py3/ydb/connection.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import logging import copy +import typing from concurrent import futures import uuid import threading @@ -61,7 +62,11 @@ def _log_request(rpc_state, request): logger.debug("%s: request = { %s }", rpc_state, _message_to_string(request)) -def _rpc_error_handler(rpc_state, rpc_error, on_disconnected=None): +def _rpc_error_handler( + rpc_state, + rpc_error: typing.Union[grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call], + on_disconnected: typing.Callable[[], None] = None, +): """ RPC call error handler, that translates gRPC error into YDB issue :param rpc_state: A state of rpc @@ -69,7 +74,7 @@ def _rpc_error_handler(rpc_state, rpc_error, on_disconnected=None): :param on_disconnected: a handler to call on disconnected connection """ logger.info("%s: received error, %s", rpc_state, rpc_error) - if isinstance(rpc_error, grpc.Call): + if isinstance(rpc_error, (grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call)): if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED: return issues.Unauthenticated(rpc_error.details()) elif rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index 8173e1e609..a464e6dc07 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.8.0" +VERSION = "3.8.1" diff --git a/yt/yt/client/table_client/config.cpp b/yt/yt/client/table_client/config.cpp index d8629254d5..863672915b 100644 --- a/yt/yt/client/table_client/config.cpp +++ b/yt/yt/client/table_client/config.cpp @@ -285,7 +285,7 @@ void TDictionaryCompressionConfig::Register(TRegistrar registrar) .Default(12_MB); registrar.Parameter("max_acceptable_compression_ratio", &TThis::MaxAcceptableCompressionRatio) .Default(0.7) - .InRange(0, 1); + .GreaterThanOrEqual(0.); registrar.Parameter("max_decompression_blob_size", &TThis::MaxDecompressionBlobSize) .GreaterThan(0) .Default(64_MB); diff --git a/yt/yt/core/actions/invoker_pool.h b/yt/yt/core/actions/invoker_pool.h index c7c6afe0a8..a467e5ce2c 100644 --- a/yt/yt/core/actions/invoker_pool.h +++ b/yt/yt/core/actions/invoker_pool.h @@ -61,6 +61,7 @@ public: { i64 EnqueuedActionCount = 0; i64 DequeuedActionCount = 0; + i64 ExecutedActionCount = 0; i64 WaitingActionCount = 0; TDuration TotalTimeEstimate; }; diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool-inl.h b/yt/yt/core/concurrency/fair_share_invoker_pool-inl.h new file mode 100644 index 0000000000..69be67ae68 --- /dev/null +++ b/yt/yt/core/concurrency/fair_share_invoker_pool-inl.h @@ -0,0 +1,40 @@ +#ifndef FAIR_SHARE_INVOKER_POOL_INL_H_ +#error "Direct inclusion of this file is not allowed, include fair_share_invoker_pool.h" +// For the sake of sane code completion. +#include "fair_share_invoker_pool.h" +#endif + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +template <class EInvoker> + requires TEnumTraits<EInvoker>::IsEnum +TDiagnosableInvokerPoolPtr CreateEnumIndexedProfiledFairShareInvokerPool( + IInvokerPtr underlyingInvoker, + TFairShareCallbackQueueFactory callbackQueueFactory, + TDuration actionTimeRelevancyHalflife, + const TString& poolName, + NProfiling::IRegistryImplPtr registry) +{ + using TTraits = TEnumTraits<EInvoker>; + + std::vector<TString> bucketNames; + bucketNames.reserve(TTraits::GetDomainSize()); + + for (const auto& enumName : TTraits::GetDomainNames()) { + bucketNames.emplace_back(enumName); + } + + return CreateProfiledFairShareInvokerPool( + std::move(underlyingInvoker), + std::move(callbackQueueFactory), + actionTimeRelevancyHalflife, + poolName, + bucketNames, + std::move(registry)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp index 99f231ebbd..7841446b7e 100644 --- a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp +++ b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp @@ -1,4 +1,5 @@ #include "fair_share_invoker_pool.h" +#include "profiling_helpers.h" #include <yt/yt/core/actions/current_invoker.h> #include <yt/yt/core/actions/invoker_detail.h> @@ -6,8 +7,13 @@ #include <yt/yt/core/misc/finally.h> #include <yt/yt/core/misc/ring_queue.h> +#include <yt/yt/core/profiling/public.h> #include <yt/yt/core/profiling/timing.h> +#include <yt/yt/library/profiling/sensor.h> + +#include <yt/yt/library/ytprof/api/api.h> + #include <library/cpp/yt/memory/weak_ptr.h> #include <library/cpp/yt/threading/rw_spin_lock.h> @@ -19,6 +25,19 @@ namespace NYT::NConcurrency { using namespace NProfiling; +using namespace NYTProf; + +//////////////////////////////////////////////////////////////////////////////// + +constinit YT_THREAD_LOCAL(TCpuProfilerTagGuard) FairShareInvokerPoolProfilerTagGuard; + +//////////////////////////////////////////////////////////////////////////////// + +#if defined(_unix_) + #define NO_UNIQUE_ADDRESS [[no_unique_address]] +#else + #define NO_UNIQUE_ADDRESS +#endif //////////////////////////////////////////////////////////////////////////////// @@ -120,23 +139,207 @@ IFairShareCallbackQueuePtr CreateFairShareCallbackQueue(int bucketCount) //////////////////////////////////////////////////////////////////////////////// +template <bool EnableProfiling> +class TFairShareInvokerPoolProfiler +{ +public: + using TObject = TFairShareInvokerPoolProfiler; + + class THandle + { + public: + void ProfileEnqueue() + { } + + void ProfileDequeue(TDuration /*waitTime*/) + { } + + void ProfileExecutionFinish(TDuration /*execTime*/, TDuration /*totalTime*/) + { } + }; + + static TObject Create( + const TString& /*poolName*/, + std::vector<TString> /*bucketNames*/, + IRegistryImplPtr /*registry*/) + { + return {}; + } + + static THandle MakeHandle(const TObject& /*self*/, int /*handleIndex*/) + { + return THandle(); + } +}; + +template <> +class TFairShareInvokerPoolProfiler<true> + : public TRefCounted +{ +public: + using TObject = TIntrusivePtr<TFairShareInvokerPoolProfiler>; + + class THandle + { + public: + void ProfileEnqueue() + { + Profiler_->ProfileEnqueue(HandleIndex_); + } + + void ProfileDequeue(TDuration waitTime) + { + Profiler_->ProfileDequeue(HandleIndex_, waitTime); + } + + void ProfileExecutionFinish(TDuration execTime, TDuration totalTime) + { + Profiler_->ProfileExecutionFinish(HandleIndex_, execTime, totalTime); + } + + private: + friend class TFairShareInvokerPoolProfiler; + + TObject Profiler_; + int HandleIndex_; + + THandle(TObject profiler, int handleIndex) + : Profiler_(std::move(profiler)) + , HandleIndex_(handleIndex) + { } + }; + + static TObject Create( + const TString& poolName, + std::vector<TString> bucketNames, + IRegistryImplPtr registry) + { + return New<TFairShareInvokerPoolProfiler>(poolName, std::move(bucketNames), std::move(registry)); + } + + static THandle MakeHandle(const TObject& self, int handleIndex) + { + return THandle(self, handleIndex); + } + +private: + friend class THandle; + + DECLARE_NEW_FRIEND(); + + std::vector<TProfilerTagPtr> BucketProfilerTags_; + + struct TCounters + { + NProfiling::TCounter EnqueuedCounter; + NProfiling::TCounter DequeuedCounter; + NProfiling::TEventTimer WaitTimer; + NProfiling::TEventTimer ExecTimer; + NProfiling::TTimeCounter CumulativeTimeCounter; + NProfiling::TEventTimer TotalTimer; + std::atomic<int> ActiveCallbacks = 0; + }; + + using TCountersPtr = std::unique_ptr<TCounters>; + + std::vector<TCountersPtr> Counters_; + + TFairShareInvokerPoolProfiler( + const TString& poolName, + std::vector<TString> bucketNames, + IRegistryImplPtr registry) + { + Counters_.reserve(std::ssize(bucketNames)); + BucketProfilerTags_.reserve(std::ssize(bucketNames)); + + for (const auto& bucketName : bucketNames) { + Counters_.push_back(CreateCounters(GetBucketTags(poolName, bucketName), registry)); + BucketProfilerTags_.push_back(New<TProfilerTag>("bucket", bucketName)); + } + } + + TCountersPtr CreateCounters(const TTagSet& tagSet, const IRegistryImplPtr& registry) { + auto profiler = TProfiler(registry, "/fair_share_invoker_pool").WithTags(tagSet).WithHot(); + + auto counters = std::make_unique<TCounters>(); + counters->EnqueuedCounter = profiler.Counter("/enqueued"); + counters->DequeuedCounter = profiler.Counter("/dequeued"); + counters->WaitTimer = profiler.Timer("/time/wait"); + counters->ExecTimer = profiler.Timer("/time/exec"); + counters->CumulativeTimeCounter = profiler.TimeCounter("/time/cumulative"); + counters->TotalTimer = profiler.Timer("/time/total"); + + profiler.AddFuncGauge( + "/size", + MakeStrong(this), + [counters = counters.get()] { + return counters->ActiveCallbacks.load(std::memory_order::relaxed); + }); + + return counters; + } + + void ProfileEnqueue(int index) + { + auto& counters = Counters_[index]; + if (counters) { + counters->ActiveCallbacks.fetch_add(1, std::memory_order::relaxed); + counters->EnqueuedCounter.Increment(); + } + } + + void ProfileDequeue(int index, TDuration waitTime) + { + auto& counters = Counters_[index]; + if (counters) { + counters->DequeuedCounter.Increment(); + counters->WaitTimer.Record(waitTime); + } + + GetTlsRef(FairShareInvokerPoolProfilerTagGuard) = TCpuProfilerTagGuard(BucketProfilerTags_[index]); + } + + void ProfileExecutionFinish(int index, TDuration execTime, TDuration totalTime) + { + GetTlsRef(FairShareInvokerPoolProfilerTagGuard) = TCpuProfilerTagGuard{}; + + auto& counters = Counters_[index]; + if (counters) { + counters->ExecTimer.Record(execTime); + counters->CumulativeTimeCounter.Add(execTime); + counters->TotalTimer.Record(totalTime); + counters->ActiveCallbacks.fetch_sub(1, std::memory_order::relaxed); + } + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +template <bool EnableProfiling> class TFairShareInvokerPool : public TDiagnosableInvokerPool { + using TPoolProfiler = TFairShareInvokerPoolProfiler<EnableProfiling>; + using TPoolProfilerObject = typename TPoolProfiler::TObject; + public: TFairShareInvokerPool( IInvokerPtr underlyingInvoker, int invokerCount, TFairShareCallbackQueueFactory callbackQueueFactory, - TDuration actionTimeRelevancyHalflife) + TDuration actionTimeRelevancyHalflife, + const TString& poolName = "", + std::vector<TString> bucketNames = {}, + IRegistryImplPtr registry = nullptr) : UnderlyingInvoker_(std::move(underlyingInvoker)) , Queue_(callbackQueueFactory(invokerCount)) + , Profiler_(TPoolProfiler::Create(poolName, std::move(bucketNames), std::move(registry))) { Invokers_.reserve(invokerCount); InvokerQueueStates_.reserve(invokerCount); for (int index = 0; index < invokerCount; ++index) { Invokers_.push_back(New<TInvoker>(UnderlyingInvoker_, index, MakeWeak(this))); - InvokerQueueStates_.emplace_back(actionTimeRelevancyHalflife); + InvokerQueueStates_.emplace_back(actionTimeRelevancyHalflife, TPoolProfiler::MakeHandle(Profiler_, index)); } } @@ -197,16 +400,33 @@ private: class TInvokerQueueState { + using THandle = typename TPoolProfiler::THandle; + public: explicit TInvokerQueueState( - TDuration halflife) + TDuration halflife, + THandle profilerHandle) : AverageTimeAggregator_(halflife) + , ProfilerHandle_(std::move(profilerHandle)) { } void OnActionEnqueued(TInstant now) { ActionEnqueueTimes_.push(now); ++EnqueuedActionCount_; + ProfilerHandle_.ProfileEnqueue(); + } + + //! We do not remove any enqueue times now because we + //! enjoy invariant ActionEnqueueTimes_.empty() iff + //! no action running. + void OnActionDequeued(TInstant now) + { + YT_VERIFY(!ActionEnqueueTimes_.empty()); + ++DequeuedActionCount_; + + auto waitTime = now - ActionEnqueueTimes_.front(); + ProfilerHandle_.ProfileDequeue(waitTime); } //! NB: We report action after execution and not after dequeue because @@ -216,15 +436,17 @@ private: //! the next time we try to check if we should enqueue action. //! This will result in more actions stuck in queue than needed //! to determine whether or not invoker is frozen. - void OnActionExecuted(TInstant now) + void OnActionExecuted(TInstant now, TInstant dequeueTime) { YT_VERIFY(!ActionEnqueueTimes_.empty()); - ++DequeuedActionCount_; + ++ExecutedActionCount_; - auto totalWaitTime = now - ActionEnqueueTimes_.front(); + auto execTime = now - dequeueTime; + auto totalTime = now - ActionEnqueueTimes_.front(); ActionEnqueueTimes_.pop(); - AverageTimeAggregator_.UpdateAt(now, totalWaitTime.MillisecondsFloat()); + AverageTimeAggregator_.UpdateAt(now, totalTime.MillisecondsFloat()); + ProfilerHandle_.ProfileExecutionFinish(execTime, totalTime); } TInvokerStatistics GetInvokerStatistics(TInstant now) const @@ -234,6 +456,7 @@ private: return TInvokerStatistics{ .EnqueuedActionCount = EnqueuedActionCount_, .DequeuedActionCount = DequeuedActionCount_, + .ExecutedActionCount = ExecutedActionCount_, .WaitingActionCount = waitingActionCount, .TotalTimeEstimate = GetTotalTimeEstimate(now), }; @@ -250,6 +473,9 @@ private: i64 EnqueuedActionCount_ = 0; i64 DequeuedActionCount_ = 0; + i64 ExecutedActionCount_ = 0; + + NO_UNIQUE_ADDRESS THandle ProfilerHandle_; TDuration GetTotalTimeEstimate(TInstant now) const { @@ -276,6 +502,8 @@ private: IFairShareCallbackQueuePtr Queue_; + NO_UNIQUE_ADDRESS TPoolProfilerObject Profiler_; + class TCpuTimeAccounter { public: @@ -344,6 +572,14 @@ private: YT_VERIFY(Queue_->TryDequeue(&callback, &bucketIndex)); YT_VERIFY(IsValidInvokerIndex(bucketIndex)); + TInstant dequeueTime = GetInstant(); + + if constexpr (EnableProfiling) { + auto guard = Guard(InvokerQueueStatesLock_); + auto& queueState = InvokerQueueStates_[bucketIndex]; + queueState.OnActionDequeued(dequeueTime); + } + //! NB1: Finally causes us to count total time (wait + execution) in our statistics. //! This is done to compensate for the following situation: //! Consider the first task in the batch @@ -365,7 +601,7 @@ private: auto guard = Guard(InvokerQueueStatesLock_); auto& queueState = InvokerQueueStates_[bucketIndex]; - queueState.OnActionExecuted(now); + queueState.OnActionExecuted(now, dequeueTime); }); { @@ -385,7 +621,7 @@ TDiagnosableInvokerPoolPtr CreateFairShareInvokerPool( TDuration actionTimeRelevancyHalflife) { YT_VERIFY(0 < invokerCount && invokerCount < 100); - return New<TFairShareInvokerPool>( + return New<TFairShareInvokerPool</*EnableProfiling*/ false>>( std::move(underlyingInvoker), invokerCount, std::move(callbackQueueFactory), @@ -394,4 +630,26 @@ TDiagnosableInvokerPoolPtr CreateFairShareInvokerPool( //////////////////////////////////////////////////////////////////////////////// +TDiagnosableInvokerPoolPtr CreateProfiledFairShareInvokerPool( + IInvokerPtr underlyingInvoker, + TFairShareCallbackQueueFactory callbackQueueFactory, + TDuration actionTimeRelevancyHalflife, + const TString& poolName, + std::vector<TString> bucketNames, + IRegistryImplPtr registry) +{ + YT_VERIFY(0 < std::ssize(bucketNames) && std::ssize(bucketNames) < 100); + + return New<TFairShareInvokerPool</*EnableProfiling*/ true>>( + std::move(underlyingInvoker), + std::ssize(bucketNames), + std::move(callbackQueueFactory), + actionTimeRelevancyHalflife, + poolName, + std::move(bucketNames), + std::move(registry)); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.h b/yt/yt/core/concurrency/fair_share_invoker_pool.h index 95c3c07b97..ad3d27783f 100644 --- a/yt/yt/core/concurrency/fair_share_invoker_pool.h +++ b/yt/yt/core/concurrency/fair_share_invoker_pool.h @@ -57,4 +57,34 @@ TDiagnosableInvokerPoolPtr CreateFairShareInvokerPool( //////////////////////////////////////////////////////////////////////////////// +//! Creates invoker pool from above with invokerCount = bucketNames.size() +//! And adds profiling on top of it. + +TDiagnosableInvokerPoolPtr CreateProfiledFairShareInvokerPool( + IInvokerPtr underlyingInvoker, + TFairShareCallbackQueueFactory callbackQueueFactory = CreateFairShareCallbackQueue, + TDuration actionTimeRelevancyHalflife = TAdjustedExponentialMovingAverage::DefaultHalflife, + const TString& poolName = "fair_share_invoker_pool", + std::vector<TString> bucketNames = {}, + NProfiling::IRegistryImplPtr registry = nullptr); + +//////////////////////////////////////////////////////////////////////////////// + +//! Same as above but bucket names are derived from EInvoker domain values. + +template <class EInvoker> + requires TEnumTraits<EInvoker>::IsEnum +TDiagnosableInvokerPoolPtr CreateEnumIndexedProfiledFairShareInvokerPool( + IInvokerPtr underlyingInvoker, + TFairShareCallbackQueueFactory callbackQueueFactory = CreateFairShareCallbackQueue, + TDuration actionTimeRelevancyHalflife = TAdjustedExponentialMovingAverage::DefaultHalflife, + const TString& poolName = "fair_share_invoker_pool", + NProfiling::IRegistryImplPtr registry = nullptr); + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NConcurrency + +#define FAIR_SHARE_INVOKER_POOL_INL_H_ +#include "fair_share_invoker_pool-inl.h" +#undef FAIR_SHARE_INVOKER_POOL_INL_H_ diff --git a/yt/yt/core/concurrency/unittests/profiled_fair_share_invoker_pool_ut.cpp b/yt/yt/core/concurrency/unittests/profiled_fair_share_invoker_pool_ut.cpp new file mode 100644 index 0000000000..89b84ad4ea --- /dev/null +++ b/yt/yt/core/concurrency/unittests/profiled_fair_share_invoker_pool_ut.cpp @@ -0,0 +1,908 @@ +#include <yt/yt/core/test_framework/framework.h> + +#include <yt/yt/core/profiling/timing.h> + +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/delayed_executor.h> +#include <yt/yt/core/concurrency/fair_share_invoker_pool.h> +#include <yt/yt/core/concurrency/scheduler.h> +#include <yt/yt/core/concurrency/thread_pool.h> + +#include <yt/yt/core/misc/collection_helpers.h> + +#include <yt/yt/core/misc/lazy_ptr.h> + +#include <yt/yt/library/profiling/solomon/exporter.h> + +#include <util/datetime/base.h> + +#include <algorithm> +#include <array> +#include <ranges> +#include <utility> + +namespace NYT::NConcurrency { +namespace { + +using namespace NProfiling; + +//////////////////////////////////////////////////////////////////////////////// + +constexpr auto Margin = TDuration::MilliSeconds(1); +constexpr auto Quantum = TDuration::MilliSeconds(100); + +//////////////////////////////////////////////////////////////////////////////// + +class TMockFairShareCallbackQueue + : public IFairShareCallbackQueue +{ +public: + explicit TMockFairShareCallbackQueue(int bucketCount) + : UnderlyingCallbackQueue_(CreateFairShareCallbackQueue(bucketCount)) + , TotalCpuTime_(bucketCount) + { } + + void Enqueue(TClosure callback, int bucketIndex) override + { + UnderlyingCallbackQueue_->Enqueue(std::move(callback), bucketIndex); + } + + bool TryDequeue(TClosure* resultCallback, int* resultBucketIndex) override + { + return UnderlyingCallbackQueue_->TryDequeue(resultCallback, resultBucketIndex); + } + + void AccountCpuTime(int bucketIndex, NProfiling::TCpuDuration cpuTime) override + { + YT_VERIFY(IsValidBucketIndex(bucketIndex)); + TotalCpuTime_[bucketIndex] += cpuTime; + UnderlyingCallbackQueue_->AccountCpuTime(bucketIndex, cpuTime); + } + + NProfiling::TCpuDuration GetTotalCpuTime(int bucketIndex) const + { + YT_VERIFY(IsValidBucketIndex(bucketIndex)); + return TotalCpuTime_[bucketIndex]; + } + +private: + const IFairShareCallbackQueuePtr UnderlyingCallbackQueue_; + std::vector<std::atomic<NProfiling::TCpuDuration>> TotalCpuTime_; + + bool IsValidBucketIndex(int bucketIndex) const + { + return 0 <= bucketIndex && bucketIndex < std::ssize(TotalCpuTime_); + } +}; + +using TMockFairShareCallbackQueuePtr = TIntrusivePtr<TMockFairShareCallbackQueue>; + +//////////////////////////////////////////////////////////////////////////////// + +class TProfiledFairShareInvokerPoolTest + : public ::testing::Test +{ +protected: + std::array<TLazyIntrusivePtr<TActionQueue>, 2> Queues_; + + THashMap<IInvoker*, int> InvokerToIndex_; + + TMockFairShareCallbackQueuePtr MockCallbackQueue; + + struct TInvocationOrder + { + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_); + std::vector<int> InvokerIndexes_; + } InvocationOrder_; + + void TearDown() override + { + for (int i = 0; i < std::ssize(Queues_); ++i) { + if (Queues_[i]) { + Queues_[i]->Shutdown(); + } + } + } + + template <typename TInvokerPoolPtr> + void InitializeInvokerToIndexMapping(const TInvokerPoolPtr& invokerPool, int invokerCount) + { + YT_VERIFY(invokerCount > 0); + InvokerToIndex_.clear(); + for (int i = 0; i < invokerCount; ++i) { + auto invoker = invokerPool->GetInvoker(i); + InvokerToIndex_[invoker.Get()] = i; + } + } + + int GetInvokerIndex(IInvoker* invokerAddress) const + { + return GetOrCrash(InvokerToIndex_, invokerAddress); + } + + int GetCurrentInvokerIndex() const + { + return GetInvokerIndex(GetCurrentInvoker()); + } + + void ClearInvocationOrder() + { + auto guard = Guard(InvocationOrder_.Lock_); + InvocationOrder_.InvokerIndexes_.clear(); + } + + void PushInvokerIndexToInvocationOrder() + { + auto currentInvokerIndex = GetCurrentInvokerIndex(); + auto guard = Guard(InvocationOrder_.Lock_); + InvocationOrder_.InvokerIndexes_.push_back(currentInvokerIndex); + } + + std::vector<int> GetInvocationOrder() + { + auto guard = Guard(InvocationOrder_.Lock_); + return InvocationOrder_.InvokerIndexes_; + } + + TDiagnosableInvokerPoolPtr CreateInvokerPool(IInvokerPtr underlyingInvoker, int invokerCount, TSolomonRegistryPtr registry = nullptr) + { + std::vector<TString> bucketNames; + + for (int i = 0; i < invokerCount; ++i) { + bucketNames.push_back("invoker_" + ToString(i)); + } + auto result = CreateProfiledFairShareInvokerPool( + std::move(underlyingInvoker), + [this] (int bucketCount) { + YT_VERIFY(bucketCount > 0); + MockCallbackQueue = New<TMockFairShareCallbackQueue>(bucketCount); + return MockCallbackQueue; + }, + TAdjustedExponentialMovingAverage::DefaultHalflife, + "TestInvokerPool", + bucketNames, + std::move(registry)); + InitializeInvokerToIndexMapping(result, invokerCount); + return result; + } + + void ExpectInvokerIndex(int invokerIndex) + { + EXPECT_EQ(invokerIndex, GetCurrentInvokerIndex()); + } + + void ExpectTotalCpuTime(int bucketIndex, TDuration expectedCpuTime, TDuration precision = Quantum / 2) + { + // Push dummy callback to the scheduler queue and synchronously wait for it + // to ensure that all possible CPU time accounters were destroyed during fiber stack unwinding. + for (int i = 0; i < std::ssize(Queues_); ++i) { + if (Queues_[i]) { + auto invoker = Queues_[i]->GetInvoker(); + BIND([] { }).AsyncVia(invoker).Run().Get().ThrowOnError(); + } + } + + auto precisionValue = NProfiling::DurationToValue(precision); + auto expectedValue = NProfiling::DurationToValue(expectedCpuTime); + auto actualValue = NProfiling::CpuDurationToValue(MockCallbackQueue->GetTotalCpuTime(bucketIndex)); + EXPECT_GT(precisionValue, std::abs(expectedValue - actualValue)); + } + + void DoTestFairness(IInvokerPoolPtr invokerPool, int invokerCount) + { + YT_VERIFY(1 < invokerCount && invokerCount < 5); + + // Each invoker executes some number of callbacks of the same duration |Quantum * (2 ^ #invokerIndex)|. + // Individual duration of callback and number of callbacks chosen + // such that total duration is same for all invokers. + auto getWeight = [] (int invokerIndex) { + return (1 << invokerIndex); + }; + auto getSpinDuration = [getWeight] (int invokerIndex) { + return Quantum * getWeight(invokerIndex); + }; + auto getCallbackCount = [getWeight, invokerCount] (int invokerIndex) { + // Weights are supposed to be in the ascending order. + return 4 * getWeight(invokerCount - 1) / getWeight(invokerIndex); + }; + + std::vector<TFuture<void>> futures; + for (int i = 0; i < invokerCount; ++i) { + for (int j = 0, callbackCount = getCallbackCount(i); j < callbackCount; ++j) { + futures.push_back( + BIND([this, spinDuration = getSpinDuration(i)] { + PushInvokerIndexToInvocationOrder(); + Spin(spinDuration); + }).AsyncVia(invokerPool->GetInvoker(i)).Run()); + } + } + + AllSucceeded(futures).Get().ThrowOnError(); + + auto invocationOrder = GetInvocationOrder(); + + // Test is considered successful if at any moment of the execution + // deviation of the weighted count of executed callbacks per invoker + // is not greater than the threshold (see in the code below). + std::vector<int> invocationCount(invokerCount); + for (auto invokerIndex : invocationOrder) { + YT_VERIFY(0 <= invokerIndex && invokerIndex < invokerCount); + + ++invocationCount[invokerIndex]; + + auto getWeightedInvocationCount = [getWeight, &invocationCount] (int invokerIndex) { + return invocationCount[invokerIndex] * getWeight(invokerIndex); + }; + + auto minWeightedInvocationCount = getWeightedInvocationCount(0); + auto maxWeightedInvocationCount = minWeightedInvocationCount; + for (int i = 0; i < invokerCount; ++i) { + auto weightedInvocationCount = getWeightedInvocationCount(i); + minWeightedInvocationCount = std::min(minWeightedInvocationCount, weightedInvocationCount); + maxWeightedInvocationCount = std::max(maxWeightedInvocationCount, weightedInvocationCount); + } + + // Compare threshold and deviation. + EXPECT_GE(getWeight(invokerCount - 1), maxWeightedInvocationCount - minWeightedInvocationCount); + } + + for (int i = 0; i < invokerCount; ++i) { + EXPECT_EQ(getCallbackCount(i), invocationCount[i]); + } + } + + void DoTestFairness(int invokerCount) + { + DoTestFairness( + CreateInvokerPool(Queues_[0]->GetInvoker(), invokerCount), + invokerCount); + } + + void DoTestSwitchTo(int switchToCount) + { + YT_VERIFY(switchToCount > 0); + + auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), switchToCount + 1); + + auto callback = BIND([this, invokerPool, switchToCount] () { + for (int i = 1; i <= switchToCount; ++i) { + ExpectInvokerIndex(i - 1); + Spin(Quantum * i); + SwitchTo(invokerPool->GetInvoker(i)); + } + ExpectInvokerIndex(switchToCount); + Spin(Quantum * (switchToCount + 1)); + }).AsyncVia(invokerPool->GetInvoker(0)); + + callback.Run().Get().ThrowOnError(); + + for (int i = 0; i <= switchToCount; ++i) { + ExpectTotalCpuTime(i, Quantum * (i + 1)); + } + } + + void DoTestWaitFor(int waitForCount) + { + YT_VERIFY(waitForCount > 0); + + auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 2); + + auto callback = BIND([waitForCount] { + Spin(Quantum); + for (int i = 0; i < waitForCount; ++i) { + TDelayedExecutor::WaitForDuration(Quantum); + Spin(Quantum); + } + }).AsyncVia(invokerPool->GetInvoker(0)); + + callback.Run().Get().ThrowOnError(); + + ExpectTotalCpuTime(0, Quantum * (waitForCount + 1)); + ExpectTotalCpuTime(1, TDuration::Zero()); + } + + static void Spin(TDuration duration) + { + NProfiling::TFiberWallTimer timer; + while (timer.GetElapsedTime() < duration) { + } + } +}; + +TEST_F(TProfiledFairShareInvokerPoolTest, Fairness2) +{ + DoTestFairness(2); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, Fairness3) +{ + DoTestFairness(3); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, Fairness4) +{ + DoTestFairness(4); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, SwitchTo12) +{ + DoTestSwitchTo(1); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, SwitchTo123) +{ + DoTestSwitchTo(2); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, SwitchTo1234) +{ + DoTestSwitchTo(3); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, SwitchTo121) +{ + auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 2); + + auto callback = BIND([this, invokerPool] { + SwitchTo(invokerPool->GetInvoker(0)); + ExpectInvokerIndex(0); + Spin(Quantum); + + SwitchTo(invokerPool->GetInvoker(1)); + ExpectInvokerIndex(1); + Spin(Quantum * 3); + + SwitchTo(invokerPool->GetInvoker(0)); + ExpectInvokerIndex(0); + Spin(Quantum); + }).AsyncVia(invokerPool->GetInvoker(0)); + + callback.Run().Get().ThrowOnError(); + + ExpectTotalCpuTime(0, Quantum * 2); + ExpectTotalCpuTime(1, Quantum * 3); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, SwitchTo111AndSwitchTo222) +{ + auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 2); + + std::vector<TFuture<void>> futures; + + futures.push_back( + BIND([this] { + ExpectInvokerIndex(0); + Spin(Quantum); + SwitchTo(GetCurrentInvoker()); + + ExpectInvokerIndex(0); + Spin(Quantum); + SwitchTo(GetCurrentInvoker()); + + ExpectInvokerIndex(0); + Spin(Quantum); + }).AsyncVia(invokerPool->GetInvoker(0)).Run()); + + futures.push_back( + BIND([this] { + ExpectInvokerIndex(1); + Spin(Quantum); + SwitchTo(GetCurrentInvoker()); + + ExpectInvokerIndex(1); + Spin(Quantum); + SwitchTo(GetCurrentInvoker()); + + ExpectInvokerIndex(1); + Spin(Quantum); + }).AsyncVia(invokerPool->GetInvoker(1)).Run()); + + AllSucceeded(futures).Get().ThrowOnError(); + + ExpectTotalCpuTime(0, Quantum * 3); + ExpectTotalCpuTime(1, Quantum * 3); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, WaitFor1) +{ + DoTestWaitFor(1); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, WaitFor2) +{ + DoTestWaitFor(2); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, WaitFor3) +{ + DoTestWaitFor(3); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, CpuTimeAccountingBetweenContextSwitchesIsNotSupportedYet) +{ + auto threadPool = CreateThreadPool(2, "ThreadPool"); + auto invokerPool = CreateInvokerPool(threadPool->GetInvoker(), 2); + + NThreading::TEvent started; + + // Start busy loop in the first thread via first fair share invoker. + auto future = BIND([this, &started] { + Spin(Quantum * 10); + + auto invocationOrder = GetInvocationOrder(); + EXPECT_TRUE(invocationOrder.empty()); + + started.NotifyOne(); + + Spin(Quantum * 50); + + invocationOrder = GetInvocationOrder(); + EXPECT_TRUE(!invocationOrder.empty()); + }).AsyncVia(invokerPool->GetInvoker(0)).Run(); + + YT_VERIFY(started.Wait(Quantum * 100)); + + // After 10 quantums of time (see notification of the #started variable) we start Fairness test in the second thread. + // In case of better implementation we expect to have non-fair CPU time distribution between first and second invokers, + // because first invoker is given more CPU time in the first thread (at least within margin of 10 quantums). + // But CPU accounting is not supported for running callbacks, therefore we expect Fairness test to pass. + DoTestFairness(invokerPool, 2); + + future.Get().ThrowOnError(); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, GetTotalWaitTimeEstimateEmptyPool) +{ + auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 1); + + EXPECT_EQ(TDuration::Zero(), invokerPool->GetInvokerStatistics(0).TotalTimeEstimate); + + WaitFor(BIND([] { + Spin(Quantum); + }).AsyncVia(invokerPool->GetInvoker(0)).Run()).ThrowOnError(); + + EXPECT_LE(invokerPool->GetInvokerStatistics(0).TotalTimeEstimate, Quantum + Margin); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, GetTotalWaitTimeEstimateStuckAction) +{ + auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 1); + NThreading::TEvent event; + + auto action = BIND([&event]{ + event.Wait(TDuration::Seconds(100)); + }) + .AsyncVia(invokerPool->GetInvoker(0)) + .Run(); + + TDelayedExecutor::WaitForDuration(Quantum); + + auto totalTimeEstimate = invokerPool->GetInvokerStatistics(0).TotalTimeEstimate; + + EXPECT_LE(totalTimeEstimate, Quantum + Margin); + EXPECT_GE(totalTimeEstimate, Quantum - Margin); + + event.NotifyAll(); + WaitFor(std::move(action)).ThrowOnError(); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, GetTotalWaitTimeEstimateRelevancyDecay) +{ + auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 1); + // Make aggregator very forgetful. + invokerPool->UpdateActionTimeRelevancyHalflife(TDuration::Zero()); + NThreading::TEvent event; + + auto action = BIND([&event]{ + event.Wait(100 * Quantum); + }) + .AsyncVia(invokerPool->GetInvoker(0)) + .Run(); + + TDelayedExecutor::WaitForDuration(Quantum); + + auto totalTimeEstimate = invokerPool->GetInvokerStatistics(0).TotalTimeEstimate; + + EXPECT_LE(totalTimeEstimate, Quantum + Margin); + EXPECT_GE(totalTimeEstimate, Quantum - Margin); + + event.NotifyAll(); + WaitFor(std::move(action)).ThrowOnError(); + + TDelayedExecutor::WaitForDuration(Quantum); + + EXPECT_LE(invokerPool->GetInvokerStatistics(0).TotalTimeEstimate, Margin); +} + +TEST_F(TProfiledFairShareInvokerPoolTest, GetTotalWaitTimeEstimateSeveralActions) +{ + static constexpr int ActionCount = 3; + + auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 1); + // Make aggregator never forget a sample. + invokerPool->UpdateActionTimeRelevancyHalflife(TDuration::Days(100000000000000000)); + + std::vector<NThreading::TEvent> leashes(ActionCount); + std::vector<TFuture<void>> actions; + + for (int idx = 0; idx < ActionCount; ++idx) { + actions.emplace_back(BIND([&leashes, idx] { + leashes[idx].Wait(100 * Quantum); + }) + .AsyncVia(invokerPool->GetInvoker(0)) + .Run()); + } + + auto expectedTotalTimeEstimate = TDuration::Zero(); + auto start = GetInstant(); + + for (int idx = 0; idx < ActionCount; ++idx) { + TDelayedExecutor::WaitForDuration(Quantum); + + auto statistics = invokerPool->GetInvokerStatistics(0); + auto expectedTotalTime = GetInstant() - start; + + if (idx == 0) { + expectedTotalTimeEstimate = expectedTotalTime; + } else { + expectedTotalTimeEstimate = (expectedTotalTimeEstimate * idx + expectedTotalTime) / (idx + 1.0); + } + + EXPECT_EQ(statistics.WaitingActionCount, ActionCount - idx); + + EXPECT_LE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate + Margin) + << TError("Index: %v.", idx).GetMessage(); + EXPECT_GE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate - Margin) + << TError("Index: %v.", idx).GetMessage(); + + leashes[idx].NotifyOne(); + WaitFor(std::move(actions[idx])).ThrowOnError(); + } +} + +TEST_F(TProfiledFairShareInvokerPoolTest, GetTotalWaitEstimateUncorrelatedWithOtherInvokers) +{ + auto executionOrderEnforcer = [] (int suggestedStep) { + static int realStep = 0; + EXPECT_EQ(realStep, suggestedStep); + ++realStep; + }; + auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 2); + // Make aggregator never forget a sample. + invokerPool->UpdateActionTimeRelevancyHalflife(TDuration::Days(100000000000000000)); + + std::vector<NThreading::TEvent> leashes(2); + std::vector<TFuture<void>> actions; + + for (int idx = 0; idx < 2; ++idx) { + actions.emplace_back(BIND([&executionOrderEnforcer, &leashes, idx] { + if (idx == 0) { + executionOrderEnforcer(0); + } else { + executionOrderEnforcer(2); + } + leashes[idx].Wait(100 * Quantum); + }) + .AsyncVia(invokerPool->GetInvoker(0)) + .Run()); + } + + NThreading::TEvent secondaryLeash; + auto secondaryAction = BIND([&executionOrderEnforcer, &secondaryLeash] { + executionOrderEnforcer(1); + secondaryLeash.Wait(100 * Quantum); + }).AsyncVia(invokerPool->GetInvoker(1)).Run(); + + auto start = GetInstant(); + + TDelayedExecutor::WaitForDuration(Quantum); + + auto statistics = invokerPool->GetInvokerStatistics(0); + auto expectedTotalTimeEstimate = GetInstant() - start; + + EXPECT_EQ(statistics.WaitingActionCount, 2); + EXPECT_LE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate + Margin); + EXPECT_GE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate - Margin); + + leashes[0].NotifyOne(); + WaitFor(std::move(actions[0])).ThrowOnError(); + + // Second action will not be executed until the secondary action is released. + + leashes[1].NotifyOne(); + TDelayedExecutor::WaitForDuration(10 * Quantum); + EXPECT_FALSE(actions[1].IsSet()); + + // Release Secondary action. + + auto secondaryStatistics = invokerPool->GetInvokerStatistics(1); + auto secondaryWaitTime = GetInstant() - start; + + EXPECT_EQ(secondaryStatistics.WaitingActionCount, 1); + EXPECT_LE(secondaryStatistics.TotalTimeEstimate, secondaryWaitTime + Margin); + EXPECT_GE(secondaryStatistics.TotalTimeEstimate, secondaryWaitTime - Margin); + + secondaryLeash.NotifyOne(); + WaitFor(std::move(secondaryAction)).ThrowOnError(); + WaitFor(std::move(actions[1])).ThrowOnError(); + + statistics = invokerPool->GetInvokerStatistics(0); + expectedTotalTimeEstimate = (expectedTotalTimeEstimate + (GetInstant() - start)) / 3.0; + + EXPECT_EQ(statistics.WaitingActionCount, 0); + EXPECT_LE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate + Margin); + EXPECT_GE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate - Margin); +} + +//////////////////////////////////////////////////////////////////////////////// + +// TL;DR: +// 1) We make solomon exporter and profiled queue with custom registry so we can read counters. +// 2) We enqueue a bunch of actions into each invoker and wait for them to complete. +// 3) We read json from exporter, check that every sensor and tagset is present. +// 4) We convert json to yson to list of sensor. +// 5) We check that: +// 5.1) There is exactly one tagless enqueue/dequeue sensor containing total number of actions in "value"; +// 5.2) There is exactly one enqueue/dequeue sensor with tag "thread":threadName +// containing total number of actions in "value"; +// 5.3) There is exactly one enqueue/dequeue sensor for each tagset {"thread":threadName, "bucket":"invoker_i"} +// containing i number of actions in "value" for i from 0 to totalInvokerCount. +class TProfiledFairShareInvokerPoolProfilingTest + : public TProfiledFairShareInvokerPoolTest +{ +public: + TSolomonExporterConfigPtr CreateExporterConfig() + { + auto config = New<TSolomonExporterConfig>(); + config->GridStep = TDuration::Seconds(1); + config->EnableCoreProfilingCompatibility = true; + config->EnableSelfProfiling = false; + + return config; + } + + template <class F> + void SyncRunCallback(IInvokerPtr invoker, F&& func) + { + WaitFor(BIND(std::forward<F>(func)).AsyncVia(invoker).Run()).ThrowOnError(); + } + + auto GetSensors(TString json) + { + for (auto& c : json) { + if (c == ':') { + c = '='; + } else if (c == ',') { + c = ';'; + } + } + + auto yson = NYson::TYsonString(json); + + auto list = NYTree::ConvertToNode(yson)->AsMap()->FindChild("sensors"); + + EXPECT_TRUE(list); + + return list->AsList()->GetChildren(); + } + + // Implementation detail is that if sensor present + // for one tagset, it is present for all of them + // thus I can't be bothered verifying it here. + void VerifyCountersArePresent(int invokerCount, TString json) + { + TString sensorPrefix = "\"sensor\":\"yt.fair_share_invoker_pool"; + TString bucketPrefix = "\"bucket\":\"invoker_"; + TString threadName = "\"thread\":\"TestInvokerPool\""; + + auto checker = [&] (const TString& pattern) { + EXPECT_TRUE(json.Contains(pattern)) + << TError("Pattern %v is missing", pattern).GetMessage(); + }; + + checker(sensorPrefix + ".size\""); + checker(sensorPrefix + ".dequeued\""); + checker(sensorPrefix + ".enqueued\""); + checker(sensorPrefix + ".time.wait.max\""); + checker(sensorPrefix + ".time.exec.max\""); + checker(sensorPrefix + ".time.cumulative\""); + checker(sensorPrefix + ".time.total.max\""); + checker(threadName); + + for (int i = 0; i < invokerCount; ++i) { + checker(bucketPrefix + ToString(i) + "\""); + } + } + + void VerifyJson(int invokerCount, TString json) + { + VerifyCountersArePresent(invokerCount, json); + + THashMap<TString, int> invokerNameToEnqueued; + bool taglessEnqueuedPresent = false; + bool taglessDequeuedPresent = false; + bool threadOnlyTagEnqueuedPresent = false; + bool threadOnlyTagDequeuedPresent = false; + + int totalActions = 0; + + for (int i = 0; i < invokerCount; ++i) { + invokerNameToEnqueued.emplace("invoker_" + ToString(i), i); + totalActions += i; + } + + THashMap<TString, int> invokerNameToDequeued = invokerNameToEnqueued; + + for (const auto& entry : GetSensors(json)) { + auto mapEntry = entry->AsMap(); + auto labels = mapEntry->FindChild("labels")->AsMap(); + + auto sensor = labels->FindChildValue<TString>("sensor"); + + if (!sensor || + !(sensor == "yt.fair_share_invoker_pool.dequeued" || + sensor == "yt.fair_share_invoker_pool.enqueued")) + { + continue; + } + + auto value = mapEntry->FindChildValue<int>("value"); + EXPECT_TRUE(value); + + auto& invokerNameToValue = + sensor == "yt.fair_share_invoker_pool.enqueued" ? + invokerNameToEnqueued : + invokerNameToDequeued; + + if (auto threadName = labels->FindChildValue<TString>("thread")) { + EXPECT_EQ(threadName, "TestInvokerPool"); + + if (auto bucketName = labels->FindChildValue<TString>("bucket")) { + EXPECT_TRUE(bucketName->StartsWith("invoker_")); + + EXPECT_TRUE(invokerNameToValue.contains(*bucketName)); + EXPECT_EQ(invokerNameToValue[*bucketName], *value); + invokerNameToValue.erase(*bucketName); + + continue; + } + + if (sensor == "yt.fair_share_invoker_pool.enqueued") { + EXPECT_FALSE(std::exchange(threadOnlyTagEnqueuedPresent, true)); + } else { + EXPECT_FALSE(std::exchange(threadOnlyTagDequeuedPresent, true)); + } + } else { + if (sensor == "yt.fair_share_invoker_pool.enqueued") { + EXPECT_FALSE(std::exchange(taglessEnqueuedPresent, true)); + } else { + EXPECT_FALSE(std::exchange(taglessDequeuedPresent, true)); + } + } + + EXPECT_EQ(value, totalActions); + } + + EXPECT_TRUE(threadOnlyTagEnqueuedPresent); + EXPECT_TRUE(threadOnlyTagDequeuedPresent); + EXPECT_TRUE(taglessEnqueuedPresent); + EXPECT_TRUE(taglessDequeuedPresent); + + EXPECT_TRUE(invokerNameToEnqueued.empty()); + EXPECT_TRUE(invokerNameToDequeued.empty()); + } + + void TestProfiler(int invokerCount) + { + auto registry = New<TSolomonRegistry>(); + auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), invokerCount, registry); + + auto config = CreateExporterConfig(); + auto exporter = New<TSolomonExporter>(config, registry); + + exporter->Start(); + + for (int invokerIdx = 0; invokerIdx < invokerCount; ++invokerIdx) { + for (int actionIndex = 0; actionIndex < invokerIdx; ++actionIndex) { + SyncRunCallback(invokerPool->GetInvoker(invokerIdx), []{}); + } + } + + Sleep(TDuration::Seconds(5)); + + auto json = exporter->ReadJson(); + ASSERT_TRUE(json); + + exporter->Stop(); + + VerifyJson(invokerCount, std::move(*json)); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +TEST_F(TProfiledFairShareInvokerPoolProfilingTest, TestProfilerGenericFairShareInvokerPool1) +{ + TestProfiler(1); +} + +TEST_F(TProfiledFairShareInvokerPoolProfilingTest, TestProfilerGenericFairShareInvokerPool2) +{ + TestProfiler(2); +} + +TEST_F(TProfiledFairShareInvokerPoolProfilingTest, TestProfilerGenericFairShareInvokerPool4) +{ + TestProfiler(4); +} + +TEST_F(TProfiledFairShareInvokerPoolProfilingTest, TestProfilerGenericFairShareInvokerPool10) +{ + TestProfiler(10); +} + +//////////////////////////////////////////////////////////////////////////////// + +DEFINE_ENUM(EInvokerBuckets, + ((Apple) (1)) + ((Pear) (2)) + ((Orange) (3)) + ((Watermelon) (4)) +); + +TEST_F(TProfiledFairShareInvokerPoolProfilingTest, TestEnumIndexedProfilerGenericFairShareInvokerPool) +{ + auto registry = New<TSolomonRegistry>(); + auto invokerPool = CreateEnumIndexedProfiledFairShareInvokerPool<EInvokerBuckets>( + Queues_[0]->GetInvoker(), + [this] (int bucketCount) { + YT_VERIFY(bucketCount > 0); + MockCallbackQueue = New<TMockFairShareCallbackQueue>(bucketCount); + return MockCallbackQueue; + }, + TAdjustedExponentialMovingAverage::DefaultHalflife, + "TestInvokerPool", + registry); + + auto config = CreateExporterConfig(); + auto exporter = New<TSolomonExporter>(config, registry); + + exporter->Start(); + + for (int invokerIdx = 0; invokerIdx < TEnumTraits<EInvokerBuckets>::GetDomainSize(); ++invokerIdx) { + SyncRunCallback(invokerPool->GetInvoker(invokerIdx), []{}); + } + + Sleep(TDuration::Seconds(5)); + + auto json = exporter->ReadJson(); + ASSERT_TRUE(json); + + exporter->Stop(); + + TEnumIndexedArray<EInvokerBuckets, bool> mentions; + std::ranges::fill(mentions, false); + + for (const auto& sensor : GetSensors(*json)) { + auto labels = sensor->AsMap()->FindChild("labels")->AsMap(); + + if (auto bucketName = labels->FindChildValue<TString>("bucket")) { + mentions[TEnumTraits<EInvokerBuckets>::FromString(*bucketName)] = true; + + if (auto sensorName = labels->FindChildValue<TString>("sensor"); + sensorName && + sensorName == "yt.fair_share_invoker_pool.dequeued") + { + auto value = sensor->AsMap()->FindChildValue<int>("value"); + + EXPECT_TRUE(value); + EXPECT_EQ(*value, 1); + } + } + } + + EXPECT_TRUE(std::ranges::all_of(mentions, std::identity{})); +} + +} // namespace +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/unittests/ya.make b/yt/yt/core/concurrency/unittests/ya.make index d1d86adfe5..b8cad1eafd 100644 --- a/yt/yt/core/concurrency/unittests/ya.make +++ b/yt/yt/core/concurrency/unittests/ya.make @@ -26,6 +26,7 @@ SRCS( nonblocking_batcher_ut.cpp nonblocking_queue_ut.cpp periodic_ut.cpp + profiled_fair_share_invoker_pool_ut.cpp propagating_storage_ut.cpp quantized_executor_ut.cpp scheduled_executor_ut.cpp |