aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-03-07 13:39:40 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-03-07 13:48:37 +0300
commita5bc35bb658487b44e707d555998bbec6cb14eab (patch)
tree4df3ffcdd48c13abc65053c681eeb9ad05901f2b
parente49f2e6094ceb19570c44590cd0e5909a104b81d (diff)
downloadydb-a5bc35bb658487b44e707d555998bbec6cb14eab.tar.gz
Intermediate changes
-rw-r--r--contrib/libs/cxxsupp/libcxx/patches/00-future-2023-11-11-license-typo-acb9156266206d53134e986bb4481b900bc4db75.patch12
-rw-r--r--contrib/python/google-auth/py3/.dist-info/METADATA2
-rw-r--r--contrib/python/google-auth/py3/google/auth/_refresh_worker.py2
-rw-r--r--contrib/python/google-auth/py3/google/auth/version.py2
-rw-r--r--contrib/python/google-auth/py3/tests/test__refresh_worker.py3
-rw-r--r--contrib/python/google-auth/py3/ya.make2
-rw-r--r--contrib/python/httpcore/.dist-info/METADATA9
-rw-r--r--contrib/python/httpcore/httpcore/__init__.py2
-rw-r--r--contrib/python/httpcore/httpcore/_async/http11.py50
-rw-r--r--contrib/python/httpcore/httpcore/_models.py8
-rw-r--r--contrib/python/httpcore/httpcore/_sync/http11.py50
-rw-r--r--contrib/python/httpcore/ya.make2
-rw-r--r--contrib/python/httpx/.dist-info/METADATA15
-rw-r--r--contrib/python/httpx/httpx/__version__.py2
-rw-r--r--contrib/python/httpx/httpx/_api.py168
-rw-r--r--contrib/python/httpx/httpx/_auth.py47
-rw-r--r--contrib/python/httpx/httpx/_client.py533
-rw-r--r--contrib/python/httpx/httpx/_config.py34
-rw-r--r--contrib/python/httpx/httpx/_content.py44
-rw-r--r--contrib/python/httpx/httpx/_decoders.py20
-rw-r--r--contrib/python/httpx/httpx/_exceptions.py18
-rw-r--r--contrib/python/httpx/httpx/_main.py30
-rw-r--r--contrib/python/httpx/httpx/_models.py144
-rw-r--r--contrib/python/httpx/httpx/_multipart.py30
-rw-r--r--contrib/python/httpx/httpx/_status_codes.py4
-rw-r--r--contrib/python/httpx/httpx/_transports/asgi.py12
-rw-r--r--contrib/python/httpx/httpx/_transports/base.py14
-rw-r--r--contrib/python/httpx/httpx/_transports/default.py34
-rw-r--r--contrib/python/httpx/httpx/_transports/mock.py4
-rw-r--r--contrib/python/httpx/httpx/_transports/wsgi.py10
-rw-r--r--contrib/python/httpx/httpx/_urlparse.py26
-rw-r--r--contrib/python/httpx/httpx/_urls.py44
-rw-r--r--contrib/python/httpx/httpx/_utils.py50
-rw-r--r--contrib/python/httpx/ya.make2
-rw-r--r--contrib/python/ydb/py3/.dist-info/METADATA2
-rw-r--r--contrib/python/ydb/py3/ya.make2
-rw-r--r--contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py2
-rw-r--r--contrib/python/ydb/py3/ydb/connection.py9
-rw-r--r--contrib/python/ydb/py3/ydb/ydb_version.py2
-rw-r--r--yt/yt/client/table_client/config.cpp2
-rw-r--r--yt/yt/core/actions/invoker_pool.h1
-rw-r--r--yt/yt/core/concurrency/fair_share_invoker_pool-inl.h40
-rw-r--r--yt/yt/core/concurrency/fair_share_invoker_pool.cpp276
-rw-r--r--yt/yt/core/concurrency/fair_share_invoker_pool.h30
-rw-r--r--yt/yt/core/concurrency/unittests/profiled_fair_share_invoker_pool_ut.cpp908
-rw-r--r--yt/yt/core/concurrency/unittests/ya.make1
46 files changed, 2036 insertions, 668 deletions
diff --git a/contrib/libs/cxxsupp/libcxx/patches/00-future-2023-11-11-license-typo-acb9156266206d53134e986bb4481b900bc4db75.patch b/contrib/libs/cxxsupp/libcxx/patches/00-future-2023-11-11-license-typo-acb9156266206d53134e986bb4481b900bc4db75.patch
new file mode 100644
index 0000000000..bed99946dc
--- /dev/null
+++ b/contrib/libs/cxxsupp/libcxx/patches/00-future-2023-11-11-license-typo-acb9156266206d53134e986bb4481b900bc4db75.patch
@@ -0,0 +1,12 @@
+diff --git a/include/__algorithm/ranges_equal_range.h b/include/__algorithm/ranges_equal_range.h
+index ed78cf3..1ff8856 100644
+--- a/include/__algorithm/ranges_equal_range.h
++++ b/include/__algorithm/ranges_equal_range.h
+@@ -1,6 +1,6 @@
+ //===----------------------------------------------------------------------===//
+ //
+-// Part of the LLVM __project, under the Apache License v2.0 with LLVM Exceptions.
++// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+ // See https://llvm.org/LICENSE.txt for license information.
+ // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+ //
diff --git a/contrib/python/google-auth/py3/.dist-info/METADATA b/contrib/python/google-auth/py3/.dist-info/METADATA
index c8e96994cf..898701cac8 100644
--- a/contrib/python/google-auth/py3/.dist-info/METADATA
+++ b/contrib/python/google-auth/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: google-auth
-Version: 2.28.0
+Version: 2.28.1
Summary: Google Authentication Library
Home-page: https://github.com/googleapis/google-auth-library-python
Author: Google Cloud Platform
diff --git a/contrib/python/google-auth/py3/google/auth/_refresh_worker.py b/contrib/python/google-auth/py3/google/auth/_refresh_worker.py
index 9bb0ccc2c5..674032d849 100644
--- a/contrib/python/google-auth/py3/google/auth/_refresh_worker.py
+++ b/contrib/python/google-auth/py3/google/auth/_refresh_worker.py
@@ -75,7 +75,7 @@ class RefreshThreadManager:
def __setstate__(self, state):
"""Pickle helper that deserializes the _lock attribute."""
- state["_key"] = threading.Lock()
+ state["_lock"] = threading.Lock()
self.__dict__.update(state)
diff --git a/contrib/python/google-auth/py3/google/auth/version.py b/contrib/python/google-auth/py3/google/auth/version.py
index 9672a6c412..7580efbee5 100644
--- a/contrib/python/google-auth/py3/google/auth/version.py
+++ b/contrib/python/google-auth/py3/google/auth/version.py
@@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-__version__ = "2.28.0"
+__version__ = "2.28.1"
diff --git a/contrib/python/google-auth/py3/tests/test__refresh_worker.py b/contrib/python/google-auth/py3/tests/test__refresh_worker.py
index f842b02cac..c25965d10b 100644
--- a/contrib/python/google-auth/py3/tests/test__refresh_worker.py
+++ b/contrib/python/google-auth/py3/tests/test__refresh_worker.py
@@ -150,7 +150,10 @@ def test_refresh_dead_worker():
def test_pickle():
w = _refresh_worker.RefreshThreadManager()
+ # For some reason isinstance cannot interpret threading.Lock as a type.
+ assert w._lock is not None
pickled_manager = pickle.dumps(w)
manager = pickle.loads(pickled_manager)
assert isinstance(manager, _refresh_worker.RefreshThreadManager)
+ assert manager._lock is not None
diff --git a/contrib/python/google-auth/py3/ya.make b/contrib/python/google-auth/py3/ya.make
index 7863862fdc..83e92e22ff 100644
--- a/contrib/python/google-auth/py3/ya.make
+++ b/contrib/python/google-auth/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(2.28.0)
+VERSION(2.28.1)
LICENSE(Apache-2.0)
diff --git a/contrib/python/httpcore/.dist-info/METADATA b/contrib/python/httpcore/.dist-info/METADATA
index 51de714c58..7804db1a8b 100644
--- a/contrib/python/httpcore/.dist-info/METADATA
+++ b/contrib/python/httpcore/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: httpcore
-Version: 1.0.3
+Version: 1.0.4
Summary: A minimal low-level HTTP client.
Project-URL: Documentation, https://www.encode.io/httpcore
Project-URL: Homepage, https://www.encode.io/httpcore/
@@ -33,7 +33,7 @@ Requires-Dist: h2<5,>=3; extra == 'http2'
Provides-Extra: socks
Requires-Dist: socksio==1.*; extra == 'socks'
Provides-Extra: trio
-Requires-Dist: trio<0.24.0,>=0.22.0; extra == 'trio'
+Requires-Dist: trio<0.25.0,>=0.22.0; extra == 'trio'
Description-Content-Type: text/markdown
# HTTP Core
@@ -153,6 +153,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
+## 1.0.4 (February 21st, 2024)
+
+- Add `target` request extension. (#888)
+- Fix support for connection `Upgrade` and `CONNECT` when some data in the stream has been read. (#882)
+
## 1.0.3 (February 13th, 2024)
- Fix support for async cancellations. (#880)
diff --git a/contrib/python/httpcore/httpcore/__init__.py b/contrib/python/httpcore/httpcore/__init__.py
index 3709fc4080..ea2a240921 100644
--- a/contrib/python/httpcore/httpcore/__init__.py
+++ b/contrib/python/httpcore/httpcore/__init__.py
@@ -130,7 +130,7 @@ __all__ = [
"WriteError",
]
-__version__ = "1.0.3"
+__version__ = "1.0.4"
__locals = locals()
diff --git a/contrib/python/httpcore/httpcore/_async/http11.py b/contrib/python/httpcore/httpcore/_async/http11.py
index a5eb480840..0493a923dc 100644
--- a/contrib/python/httpcore/httpcore/_async/http11.py
+++ b/contrib/python/httpcore/httpcore/_async/http11.py
@@ -1,8 +1,10 @@
import enum
import logging
+import ssl
import time
from types import TracebackType
from typing import (
+ Any,
AsyncIterable,
AsyncIterator,
List,
@@ -107,6 +109,7 @@ class AsyncHTTP11Connection(AsyncConnectionInterface):
status,
reason_phrase,
headers,
+ trailing_data,
) = await self._receive_response_headers(**kwargs)
trace.return_value = (
http_version,
@@ -115,6 +118,14 @@ class AsyncHTTP11Connection(AsyncConnectionInterface):
headers,
)
+ network_stream = self._network_stream
+
+ # CONNECT or Upgrade request
+ if (status == 101) or (
+ (request.method == b"CONNECT") and (200 <= status < 300)
+ ):
+ network_stream = AsyncHTTP11UpgradeStream(network_stream, trailing_data)
+
return Response(
status=status,
headers=headers,
@@ -122,7 +133,7 @@ class AsyncHTTP11Connection(AsyncConnectionInterface):
extensions={
"http_version": http_version,
"reason_phrase": reason_phrase,
- "network_stream": self._network_stream,
+ "network_stream": network_stream,
},
)
except BaseException as exc:
@@ -167,7 +178,7 @@ class AsyncHTTP11Connection(AsyncConnectionInterface):
async def _receive_response_headers(
self, request: Request
- ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]:
+ ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], bytes]:
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("read", None)
@@ -187,7 +198,9 @@ class AsyncHTTP11Connection(AsyncConnectionInterface):
# raw header casing, rather than the enforced lowercase headers.
headers = event.headers.raw_items()
- return http_version, event.status_code, event.reason, headers
+ trailing_data, _ = self._h11_state.trailing_data
+
+ return http_version, event.status_code, event.reason, headers, trailing_data
async def _receive_response_body(self, request: Request) -> AsyncIterator[bytes]:
timeouts = request.extensions.get("timeout", {})
@@ -340,3 +353,34 @@ class HTTP11ConnectionByteStream:
self._closed = True
async with Trace("response_closed", logger, self._request):
await self._connection._response_closed()
+
+
+class AsyncHTTP11UpgradeStream(AsyncNetworkStream):
+ def __init__(self, stream: AsyncNetworkStream, leading_data: bytes) -> None:
+ self._stream = stream
+ self._leading_data = leading_data
+
+ async def read(self, max_bytes: int, timeout: Optional[float] = None) -> bytes:
+ if self._leading_data:
+ buffer = self._leading_data[:max_bytes]
+ self._leading_data = self._leading_data[max_bytes:]
+ return buffer
+ else:
+ return await self._stream.read(max_bytes, timeout)
+
+ async def write(self, buffer: bytes, timeout: Optional[float] = None) -> None:
+ await self._stream.write(buffer, timeout)
+
+ async def aclose(self) -> None:
+ await self._stream.aclose()
+
+ async def start_tls(
+ self,
+ ssl_context: ssl.SSLContext,
+ server_hostname: Optional[str] = None,
+ timeout: Optional[float] = None,
+ ) -> AsyncNetworkStream:
+ return await self._stream.start_tls(ssl_context, server_hostname, timeout)
+
+ def get_extra_info(self, info: str) -> Any:
+ return self._stream.get_extra_info(info)
diff --git a/contrib/python/httpcore/httpcore/_models.py b/contrib/python/httpcore/httpcore/_models.py
index 11bfcd84f0..397bd758d0 100644
--- a/contrib/python/httpcore/httpcore/_models.py
+++ b/contrib/python/httpcore/httpcore/_models.py
@@ -353,6 +353,14 @@ class Request:
)
self.extensions = {} if extensions is None else extensions
+ if "target" in self.extensions:
+ self.url = URL(
+ scheme=self.url.scheme,
+ host=self.url.host,
+ port=self.url.port,
+ target=self.extensions["target"],
+ )
+
def __repr__(self) -> str:
return f"<{self.__class__.__name__} [{self.method!r}]>"
diff --git a/contrib/python/httpcore/httpcore/_sync/http11.py b/contrib/python/httpcore/httpcore/_sync/http11.py
index e108f88b12..a74ff8e809 100644
--- a/contrib/python/httpcore/httpcore/_sync/http11.py
+++ b/contrib/python/httpcore/httpcore/_sync/http11.py
@@ -1,8 +1,10 @@
import enum
import logging
+import ssl
import time
from types import TracebackType
from typing import (
+ Any,
Iterable,
Iterator,
List,
@@ -107,6 +109,7 @@ class HTTP11Connection(ConnectionInterface):
status,
reason_phrase,
headers,
+ trailing_data,
) = self._receive_response_headers(**kwargs)
trace.return_value = (
http_version,
@@ -115,6 +118,14 @@ class HTTP11Connection(ConnectionInterface):
headers,
)
+ network_stream = self._network_stream
+
+ # CONNECT or Upgrade request
+ if (status == 101) or (
+ (request.method == b"CONNECT") and (200 <= status < 300)
+ ):
+ network_stream = HTTP11UpgradeStream(network_stream, trailing_data)
+
return Response(
status=status,
headers=headers,
@@ -122,7 +133,7 @@ class HTTP11Connection(ConnectionInterface):
extensions={
"http_version": http_version,
"reason_phrase": reason_phrase,
- "network_stream": self._network_stream,
+ "network_stream": network_stream,
},
)
except BaseException as exc:
@@ -167,7 +178,7 @@ class HTTP11Connection(ConnectionInterface):
def _receive_response_headers(
self, request: Request
- ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]:
+ ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], bytes]:
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("read", None)
@@ -187,7 +198,9 @@ class HTTP11Connection(ConnectionInterface):
# raw header casing, rather than the enforced lowercase headers.
headers = event.headers.raw_items()
- return http_version, event.status_code, event.reason, headers
+ trailing_data, _ = self._h11_state.trailing_data
+
+ return http_version, event.status_code, event.reason, headers, trailing_data
def _receive_response_body(self, request: Request) -> Iterator[bytes]:
timeouts = request.extensions.get("timeout", {})
@@ -340,3 +353,34 @@ class HTTP11ConnectionByteStream:
self._closed = True
with Trace("response_closed", logger, self._request):
self._connection._response_closed()
+
+
+class HTTP11UpgradeStream(NetworkStream):
+ def __init__(self, stream: NetworkStream, leading_data: bytes) -> None:
+ self._stream = stream
+ self._leading_data = leading_data
+
+ def read(self, max_bytes: int, timeout: Optional[float] = None) -> bytes:
+ if self._leading_data:
+ buffer = self._leading_data[:max_bytes]
+ self._leading_data = self._leading_data[max_bytes:]
+ return buffer
+ else:
+ return self._stream.read(max_bytes, timeout)
+
+ def write(self, buffer: bytes, timeout: Optional[float] = None) -> None:
+ self._stream.write(buffer, timeout)
+
+ def close(self) -> None:
+ self._stream.close()
+
+ def start_tls(
+ self,
+ ssl_context: ssl.SSLContext,
+ server_hostname: Optional[str] = None,
+ timeout: Optional[float] = None,
+ ) -> NetworkStream:
+ return self._stream.start_tls(ssl_context, server_hostname, timeout)
+
+ def get_extra_info(self, info: str) -> Any:
+ return self._stream.get_extra_info(info)
diff --git a/contrib/python/httpcore/ya.make b/contrib/python/httpcore/ya.make
index e8408ed893..66ea31672f 100644
--- a/contrib/python/httpcore/ya.make
+++ b/contrib/python/httpcore/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(1.0.3)
+VERSION(1.0.4)
LICENSE(BSD-3-Clause)
diff --git a/contrib/python/httpx/.dist-info/METADATA b/contrib/python/httpx/.dist-info/METADATA
index ab2b707fcd..b5ec37c7d9 100644
--- a/contrib/python/httpx/.dist-info/METADATA
+++ b/contrib/python/httpx/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: httpx
-Version: 0.26.0
+Version: 0.27.0
Summary: The next generation HTTP client.
Project-URL: Changelog, https://github.com/encode/httpx/blob/master/CHANGELOG.md
Project-URL: Documentation, https://www.python-httpx.org
@@ -194,21 +194,14 @@ inspiration around the lower-level networking details.
## Release Information
-### Added
-
-* The `proxy` argument was added. You should use the `proxy` argument instead of the deprecated `proxies`, or use `mounts=` for more complex configurations. (#2879)
-
### Deprecated
-* The `proxies` argument is now deprecated. It will still continue to work, but it will be removed in the future. (#2879)
+* The `app=...` shortcut has been deprecated. Use the explicit style of `transport=httpx.WSGITransport()` or `transport=httpx.ASGITransport()` instead.
### Fixed
-* Fix cases of double escaping of URL path components. Allow / as a safe character in the query portion. (#2990)
-* Handle `NO_PROXY` envvar cases when a fully qualified URL is supplied as the value. (#2741)
-* Allow URLs where username or password contains unescaped '@'. (#2986)
-* Ensure ASGI `raw_path` does not include URL query component. (#2999)
-* Ensure `Response.iter_text()` cannot yield empty strings. (#2998)
+* Respect the `http1` argument while configuring proxy transports. (#3023)
+* Fix RFC 2069 mode digest authentication. (#3045)
---
diff --git a/contrib/python/httpx/httpx/__version__.py b/contrib/python/httpx/httpx/__version__.py
index 3edc842c69..c121a898de 100644
--- a/contrib/python/httpx/httpx/__version__.py
+++ b/contrib/python/httpx/httpx/__version__.py
@@ -1,3 +1,3 @@
__title__ = "httpx"
__description__ = "A next generation HTTP client, for Python 3."
-__version__ = "0.26.0"
+__version__ = "0.27.0"
diff --git a/contrib/python/httpx/httpx/_api.py b/contrib/python/httpx/httpx/_api.py
index c7af947218..b5821cc49e 100644
--- a/contrib/python/httpx/httpx/_api.py
+++ b/contrib/python/httpx/httpx/_api.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import typing
from contextlib import contextmanager
@@ -25,20 +27,20 @@ def request(
method: str,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ params: QueryParamTypes | None = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
trust_env: bool = True,
) -> Response:
"""
@@ -120,20 +122,20 @@ def stream(
method: str,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ params: QueryParamTypes | None = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
trust_env: bool = True,
) -> typing.Iterator[Response]:
"""
@@ -173,14 +175,14 @@ def stream(
def get(
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
@@ -213,14 +215,14 @@ def get(
def options(
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
@@ -253,14 +255,14 @@ def options(
def head(
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
@@ -293,18 +295,18 @@ def head(
def post(
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
@@ -338,18 +340,18 @@ def post(
def put(
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
@@ -383,18 +385,18 @@ def put(
def patch(
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
@@ -428,14 +430,14 @@ def patch(
def delete(
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
diff --git a/contrib/python/httpx/httpx/_auth.py b/contrib/python/httpx/httpx/_auth.py
index 66132500ff..903e399617 100644
--- a/contrib/python/httpx/httpx/_auth.py
+++ b/contrib/python/httpx/httpx/_auth.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import hashlib
import os
import re
@@ -124,18 +126,14 @@ class BasicAuth(Auth):
and uses HTTP Basic authentication.
"""
- def __init__(
- self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
- ) -> None:
+ def __init__(self, username: str | bytes, password: str | bytes) -> None:
self._auth_header = self._build_auth_header(username, password)
def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
request.headers["Authorization"] = self._auth_header
yield request
- def _build_auth_header(
- self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
- ) -> str:
+ def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str:
userpass = b":".join((to_bytes(username), to_bytes(password)))
token = b64encode(userpass).decode()
return f"Basic {token}"
@@ -146,7 +144,7 @@ class NetRCAuth(Auth):
Use a 'netrc' file to lookup basic auth credentials based on the url host.
"""
- def __init__(self, file: typing.Optional[str] = None) -> None:
+ def __init__(self, file: str | None = None) -> None:
# Lazily import 'netrc'.
# There's no need for us to load this module unless 'NetRCAuth' is being used.
import netrc
@@ -165,16 +163,14 @@ class NetRCAuth(Auth):
)
yield request
- def _build_auth_header(
- self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
- ) -> str:
+ def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str:
userpass = b":".join((to_bytes(username), to_bytes(password)))
token = b64encode(userpass).decode()
return f"Basic {token}"
class DigestAuth(Auth):
- _ALGORITHM_TO_HASH_FUNCTION: typing.Dict[str, typing.Callable[[bytes], "_Hash"]] = {
+ _ALGORITHM_TO_HASH_FUNCTION: dict[str, typing.Callable[[bytes], _Hash]] = {
"MD5": hashlib.md5,
"MD5-SESS": hashlib.md5,
"SHA": hashlib.sha1,
@@ -185,12 +181,10 @@ class DigestAuth(Auth):
"SHA-512-SESS": hashlib.sha512,
}
- def __init__(
- self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
- ) -> None:
+ def __init__(self, username: str | bytes, password: str | bytes) -> None:
self._username = to_bytes(username)
self._password = to_bytes(password)
- self._last_challenge: typing.Optional[_DigestAuthChallenge] = None
+ self._last_challenge: _DigestAuthChallenge | None = None
self._nonce_count = 1
def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
@@ -226,7 +220,7 @@ class DigestAuth(Auth):
def _parse_challenge(
self, request: Request, response: Response, auth_header: str
- ) -> "_DigestAuthChallenge":
+ ) -> _DigestAuthChallenge:
"""
Returns a challenge from a Digest WWW-Authenticate header.
These take the form of:
@@ -237,7 +231,7 @@ class DigestAuth(Auth):
# This method should only ever have been called with a Digest auth header.
assert scheme.lower() == "digest"
- header_dict: typing.Dict[str, str] = {}
+ header_dict: dict[str, str] = {}
for field in parse_http_list(fields):
key, value = field.strip().split("=", 1)
header_dict[key] = unquote(value)
@@ -256,7 +250,7 @@ class DigestAuth(Auth):
raise ProtocolError(message, request=request) from exc
def _build_auth_header(
- self, request: Request, challenge: "_DigestAuthChallenge"
+ self, request: Request, challenge: _DigestAuthChallenge
) -> str:
hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()]
@@ -280,17 +274,18 @@ class DigestAuth(Auth):
qop = self._resolve_qop(challenge.qop, request=request)
if qop is None:
+ # Following RFC 2069
digest_data = [HA1, challenge.nonce, HA2]
else:
- digest_data = [challenge.nonce, nc_value, cnonce, qop, HA2]
- key_digest = b":".join(digest_data)
+ # Following RFC 2617/7616
+ digest_data = [HA1, challenge.nonce, nc_value, cnonce, qop, HA2]
format_args = {
"username": self._username,
"realm": challenge.realm,
"nonce": challenge.nonce,
"uri": path,
- "response": digest(b":".join((HA1, key_digest))),
+ "response": digest(b":".join(digest_data)),
"algorithm": challenge.algorithm.encode(),
}
if challenge.opaque:
@@ -310,7 +305,7 @@ class DigestAuth(Auth):
return hashlib.sha1(s).hexdigest()[:16].encode()
- def _get_header_value(self, header_fields: typing.Dict[str, bytes]) -> str:
+ def _get_header_value(self, header_fields: dict[str, bytes]) -> str:
NON_QUOTED_FIELDS = ("algorithm", "qop", "nc")
QUOTED_TEMPLATE = '{}="{}"'
NON_QUOTED_TEMPLATE = "{}={}"
@@ -328,9 +323,7 @@ class DigestAuth(Auth):
return header_value
- def _resolve_qop(
- self, qop: typing.Optional[bytes], request: Request
- ) -> typing.Optional[bytes]:
+ def _resolve_qop(self, qop: bytes | None, request: Request) -> bytes | None:
if qop is None:
return None
qops = re.split(b", ?", qop)
@@ -348,5 +341,5 @@ class _DigestAuthChallenge(typing.NamedTuple):
realm: bytes
nonce: bytes
algorithm: str
- opaque: typing.Optional[bytes]
- qop: typing.Optional[bytes]
+ opaque: bytes | None
+ qop: bytes | None
diff --git a/contrib/python/httpx/httpx/_client.py b/contrib/python/httpx/httpx/_client.py
index 2813a84f01..e2c6702e0c 100644
--- a/contrib/python/httpx/httpx/_client.py
+++ b/contrib/python/httpx/httpx/_client.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import datetime
import enum
import logging
@@ -160,19 +162,17 @@ class BaseClient:
def __init__(
self,
*,
- auth: typing.Optional[AuthTypes] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
+ auth: AuthTypes | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
- event_hooks: typing.Optional[
- typing.Mapping[str, typing.List[EventHook]]
- ] = None,
+ event_hooks: None | (typing.Mapping[str, list[EventHook]]) = None,
base_url: URLTypes = "",
trust_env: bool = True,
- default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8",
+ default_encoding: str | typing.Callable[[bytes], str] = "utf-8",
) -> None:
event_hooks = {} if event_hooks is None else event_hooks
@@ -210,8 +210,8 @@ class BaseClient:
return url.copy_with(raw_path=url.raw_path + b"/")
def _get_proxy_map(
- self, proxies: typing.Optional[ProxiesTypes], allow_env_proxies: bool
- ) -> typing.Dict[str, typing.Optional[Proxy]]:
+ self, proxies: ProxiesTypes | None, allow_env_proxies: bool
+ ) -> dict[str, Proxy | None]:
if proxies is None:
if allow_env_proxies:
return {
@@ -238,20 +238,18 @@ class BaseClient:
self._timeout = Timeout(timeout)
@property
- def event_hooks(self) -> typing.Dict[str, typing.List[EventHook]]:
+ def event_hooks(self) -> dict[str, list[EventHook]]:
return self._event_hooks
@event_hooks.setter
- def event_hooks(
- self, event_hooks: typing.Dict[str, typing.List[EventHook]]
- ) -> None:
+ def event_hooks(self, event_hooks: dict[str, list[EventHook]]) -> None:
self._event_hooks = {
"request": list(event_hooks.get("request", [])),
"response": list(event_hooks.get("response", [])),
}
@property
- def auth(self) -> typing.Optional[Auth]:
+ def auth(self) -> Auth | None:
"""
Authentication class used when none is passed at the request-level.
@@ -323,15 +321,15 @@ class BaseClient:
method: str,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Request:
"""
Build and return a request instance.
@@ -391,9 +389,7 @@ class BaseClient:
return self.base_url.copy_with(raw_path=merge_raw_path)
return merge_url
- def _merge_cookies(
- self, cookies: typing.Optional[CookieTypes] = None
- ) -> typing.Optional[CookieTypes]:
+ def _merge_cookies(self, cookies: CookieTypes | None = None) -> CookieTypes | None:
"""
Merge a cookies argument together with any cookies on the client,
to create the cookies used for the outgoing request.
@@ -404,9 +400,7 @@ class BaseClient:
return merged_cookies
return cookies
- def _merge_headers(
- self, headers: typing.Optional[HeaderTypes] = None
- ) -> typing.Optional[HeaderTypes]:
+ def _merge_headers(self, headers: HeaderTypes | None = None) -> HeaderTypes | None:
"""
Merge a headers argument together with any headers on the client,
to create the headers used for the outgoing request.
@@ -416,8 +410,8 @@ class BaseClient:
return merged_headers
def _merge_queryparams(
- self, params: typing.Optional[QueryParamTypes] = None
- ) -> typing.Optional[QueryParamTypes]:
+ self, params: QueryParamTypes | None = None
+ ) -> QueryParamTypes | None:
"""
Merge a queryparams argument together with any queryparams on the client,
to create the queryparams used for the outgoing request.
@@ -427,7 +421,7 @@ class BaseClient:
return merged_queryparams.merge(params)
return params
- def _build_auth(self, auth: typing.Optional[AuthTypes]) -> typing.Optional[Auth]:
+ def _build_auth(self, auth: AuthTypes | None) -> Auth | None:
if auth is None:
return None
elif isinstance(auth, tuple):
@@ -442,7 +436,7 @@ class BaseClient:
def _build_request_auth(
self,
request: Request,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
) -> Auth:
auth = (
self._auth if isinstance(auth, UseClientDefault) else self._build_auth(auth)
@@ -557,7 +551,7 @@ class BaseClient:
def _redirect_stream(
self, request: Request, method: str
- ) -> typing.Optional[typing.Union[SyncByteStream, AsyncByteStream]]:
+ ) -> SyncByteStream | AsyncByteStream | None:
"""
Return the body that should be used for the redirect request.
"""
@@ -598,6 +592,8 @@ class Client(BaseClient):
to authenticate the client. Either a path to an SSL certificate file, or
two-tuple of (certificate file, key file), or a three-tuple of (certificate
file, key file, password).
+ * **http2** - *(optional)* A boolean indicating if HTTP/2 support should be
+ enabled. Defaults to `False`.
* **proxy** - *(optional)* A proxy URL where all the traffic should be routed.
* **proxies** - *(optional)* A dictionary mapping proxy keys to proxy
URLs.
@@ -622,31 +618,27 @@ class Client(BaseClient):
def __init__(
self,
*,
- auth: typing.Optional[AuthTypes] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
+ auth: AuthTypes | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
- mounts: typing.Optional[
- typing.Mapping[str, typing.Optional[BaseTransport]]
- ] = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
+ mounts: None | (typing.Mapping[str, BaseTransport | None]) = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
limits: Limits = DEFAULT_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
- event_hooks: typing.Optional[
- typing.Mapping[str, typing.List[EventHook]]
- ] = None,
+ event_hooks: None | (typing.Mapping[str, list[EventHook]]) = None,
base_url: URLTypes = "",
- transport: typing.Optional[BaseTransport] = None,
- app: typing.Optional[typing.Callable[..., typing.Any]] = None,
+ transport: BaseTransport | None = None,
+ app: typing.Callable[..., typing.Any] | None = None,
trust_env: bool = True,
- default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8",
+ default_encoding: str | typing.Callable[[bytes], str] = "utf-8",
) -> None:
super().__init__(
auth=auth,
@@ -680,6 +672,13 @@ class Client(BaseClient):
if proxy:
raise RuntimeError("Use either `proxy` or 'proxies', not both.")
+ if app:
+ message = (
+ "The 'app' shortcut is now deprecated."
+ " Use the explicit style 'transport=WSGITransport(app=...)' instead."
+ )
+ warnings.warn(message, DeprecationWarning)
+
allow_env_proxies = trust_env and app is None and transport is None
proxy_map = self._get_proxy_map(proxies or proxy, allow_env_proxies)
@@ -693,7 +692,7 @@ class Client(BaseClient):
app=app,
trust_env=trust_env,
)
- self._mounts: typing.Dict[URLPattern, typing.Optional[BaseTransport]] = {
+ self._mounts: dict[URLPattern, BaseTransport | None] = {
URLPattern(key): None
if proxy is None
else self._init_proxy_transport(
@@ -717,12 +716,12 @@ class Client(BaseClient):
def _init_transport(
self,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
- transport: typing.Optional[BaseTransport] = None,
- app: typing.Optional[typing.Callable[..., typing.Any]] = None,
+ transport: BaseTransport | None = None,
+ app: typing.Callable[..., typing.Any] | None = None,
trust_env: bool = True,
) -> BaseTransport:
if transport is not None:
@@ -744,7 +743,7 @@ class Client(BaseClient):
self,
proxy: Proxy,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
@@ -776,17 +775,17 @@ class Client(BaseClient):
method: str,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Build and send a request.
@@ -833,17 +832,17 @@ class Client(BaseClient):
method: str,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> typing.Iterator[Response]:
"""
Alternative to `httpx.request()` that streams the response body
@@ -884,8 +883,8 @@ class Client(BaseClient):
request: Request,
*,
stream: bool = False,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
) -> Response:
"""
Send a request.
@@ -933,7 +932,7 @@ class Client(BaseClient):
request: Request,
auth: Auth,
follow_redirects: bool,
- history: typing.List[Response],
+ history: list[Response],
) -> Response:
auth_flow = auth.sync_auth_flow(request)
try:
@@ -966,7 +965,7 @@ class Client(BaseClient):
self,
request: Request,
follow_redirects: bool,
- history: typing.List[Response],
+ history: list[Response],
) -> Response:
while True:
if len(history) > self.max_redirects:
@@ -1039,13 +1038,13 @@ class Client(BaseClient):
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `GET` request.
@@ -1068,13 +1067,13 @@ class Client(BaseClient):
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send an `OPTIONS` request.
@@ -1097,13 +1096,13 @@ class Client(BaseClient):
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `HEAD` request.
@@ -1126,17 +1125,17 @@ class Client(BaseClient):
self,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `POST` request.
@@ -1163,17 +1162,17 @@ class Client(BaseClient):
self,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `PUT` request.
@@ -1200,17 +1199,17 @@ class Client(BaseClient):
self,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `PATCH` request.
@@ -1237,13 +1236,13 @@ class Client(BaseClient):
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `DELETE` request.
@@ -1294,9 +1293,9 @@ class Client(BaseClient):
def __exit__(
self,
- exc_type: typing.Optional[typing.Type[BaseException]] = None,
- exc_value: typing.Optional[BaseException] = None,
- traceback: typing.Optional[TracebackType] = None,
+ exc_type: type[BaseException] | None = None,
+ exc_value: BaseException | None = None,
+ traceback: TracebackType | None = None,
) -> None:
self._state = ClientState.CLOSED
@@ -1311,6 +1310,8 @@ class AsyncClient(BaseClient):
An asynchronous HTTP client, with connection pooling, HTTP/2, redirects,
cookie persistence, etc.
+ It can be shared between tasks.
+
Usage:
```python
@@ -1362,31 +1363,28 @@ class AsyncClient(BaseClient):
def __init__(
self,
*,
- auth: typing.Optional[AuthTypes] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
+ auth: AuthTypes | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
- mounts: typing.Optional[
- typing.Mapping[str, typing.Optional[AsyncBaseTransport]]
- ] = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
+ mounts: None | (typing.Mapping[str, AsyncBaseTransport | None]) = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
limits: Limits = DEFAULT_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
- event_hooks: typing.Optional[
- typing.Mapping[str, typing.List[typing.Callable[..., typing.Any]]]
- ] = None,
+ event_hooks: None
+ | (typing.Mapping[str, list[typing.Callable[..., typing.Any]]]) = None,
base_url: URLTypes = "",
- transport: typing.Optional[AsyncBaseTransport] = None,
- app: typing.Optional[typing.Callable[..., typing.Any]] = None,
+ transport: AsyncBaseTransport | None = None,
+ app: typing.Callable[..., typing.Any] | None = None,
trust_env: bool = True,
- default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8",
+ default_encoding: str | typing.Callable[[bytes], str] = "utf-8",
) -> None:
super().__init__(
auth=auth,
@@ -1420,7 +1418,14 @@ class AsyncClient(BaseClient):
if proxy:
raise RuntimeError("Use either `proxy` or 'proxies', not both.")
- allow_env_proxies = trust_env and app is None and transport is None
+ if app:
+ message = (
+ "The 'app' shortcut is now deprecated."
+ " Use the explicit style 'transport=ASGITransport(app=...)' instead."
+ )
+ warnings.warn(message, DeprecationWarning)
+
+ allow_env_proxies = trust_env and transport is None
proxy_map = self._get_proxy_map(proxies or proxy, allow_env_proxies)
self._transport = self._init_transport(
@@ -1434,7 +1439,7 @@ class AsyncClient(BaseClient):
trust_env=trust_env,
)
- self._mounts: typing.Dict[URLPattern, typing.Optional[AsyncBaseTransport]] = {
+ self._mounts: dict[URLPattern, AsyncBaseTransport | None] = {
URLPattern(key): None
if proxy is None
else self._init_proxy_transport(
@@ -1457,12 +1462,12 @@ class AsyncClient(BaseClient):
def _init_transport(
self,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
- transport: typing.Optional[AsyncBaseTransport] = None,
- app: typing.Optional[typing.Callable[..., typing.Any]] = None,
+ transport: AsyncBaseTransport | None = None,
+ app: typing.Callable[..., typing.Any] | None = None,
trust_env: bool = True,
) -> AsyncBaseTransport:
if transport is not None:
@@ -1484,7 +1489,7 @@ class AsyncClient(BaseClient):
self,
proxy: Proxy,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
@@ -1493,6 +1498,7 @@ class AsyncClient(BaseClient):
return AsyncHTTPTransport(
verify=verify,
cert=cert,
+ http1=http1,
http2=http2,
limits=limits,
trust_env=trust_env,
@@ -1515,17 +1521,17 @@ class AsyncClient(BaseClient):
method: str,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Build and send a request.
@@ -1543,6 +1549,15 @@ class AsyncClient(BaseClient):
[0]: /advanced/#merging-of-configuration
"""
+
+ if cookies is not None: # pragma: no cover
+ message = (
+ "Setting per-request cookies=<...> is being deprecated, because "
+ "the expected behaviour on cookie persistence is ambiguous. Set "
+ "cookies directly on the client instance instead."
+ )
+ warnings.warn(message, DeprecationWarning)
+
request = self.build_request(
method=method,
url=url,
@@ -1564,17 +1579,17 @@ class AsyncClient(BaseClient):
method: str,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> typing.AsyncIterator[Response]:
"""
Alternative to `httpx.request()` that streams the response body
@@ -1615,8 +1630,8 @@ class AsyncClient(BaseClient):
request: Request,
*,
stream: bool = False,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
) -> Response:
"""
Send a request.
@@ -1655,7 +1670,7 @@ class AsyncClient(BaseClient):
return response
- except BaseException as exc: # pragma: no cover
+ except BaseException as exc:
await response.aclose()
raise exc
@@ -1664,7 +1679,7 @@ class AsyncClient(BaseClient):
request: Request,
auth: Auth,
follow_redirects: bool,
- history: typing.List[Response],
+ history: list[Response],
) -> Response:
auth_flow = auth.async_auth_flow(request)
try:
@@ -1697,7 +1712,7 @@ class AsyncClient(BaseClient):
self,
request: Request,
follow_redirects: bool,
- history: typing.List[Response],
+ history: list[Response],
) -> Response:
while True:
if len(history) > self.max_redirects:
@@ -1770,13 +1785,13 @@ class AsyncClient(BaseClient):
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `GET` request.
@@ -1799,13 +1814,13 @@ class AsyncClient(BaseClient):
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send an `OPTIONS` request.
@@ -1828,13 +1843,13 @@ class AsyncClient(BaseClient):
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `HEAD` request.
@@ -1857,17 +1872,17 @@ class AsyncClient(BaseClient):
self,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `POST` request.
@@ -1894,17 +1909,17 @@ class AsyncClient(BaseClient):
self,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `PUT` request.
@@ -1931,17 +1946,17 @@ class AsyncClient(BaseClient):
self,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `PATCH` request.
@@ -1968,13 +1983,13 @@ class AsyncClient(BaseClient):
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `DELETE` request.
@@ -2025,9 +2040,9 @@ class AsyncClient(BaseClient):
async def __aexit__(
self,
- exc_type: typing.Optional[typing.Type[BaseException]] = None,
- exc_value: typing.Optional[BaseException] = None,
- traceback: typing.Optional[TracebackType] = None,
+ exc_type: type[BaseException] | None = None,
+ exc_value: BaseException | None = None,
+ traceback: TracebackType | None = None,
) -> None:
self._state = ClientState.CLOSED
diff --git a/contrib/python/httpx/httpx/_config.py b/contrib/python/httpx/httpx/_config.py
index d4fcfb5c0d..6a3ae8022c 100644
--- a/contrib/python/httpx/httpx/_config.py
+++ b/contrib/python/httpx/httpx/_config.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import logging
import os
import ssl
@@ -43,7 +45,7 @@ UNSET = UnsetType()
def create_ssl_context(
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
trust_env: bool = True,
http2: bool = False,
@@ -67,7 +69,7 @@ class SSLConfig:
def __init__(
self,
*,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
trust_env: bool = True,
http2: bool = False,
@@ -192,7 +194,7 @@ class SSLConfig:
ssl_context.load_cert_chain(
certfile=self.cert[0],
keyfile=self.cert[1],
- password=self.cert[2], # type: ignore
+ password=self.cert[2],
)
@@ -212,12 +214,12 @@ class Timeout:
def __init__(
self,
- timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
+ timeout: TimeoutTypes | UnsetType = UNSET,
*,
- connect: typing.Union[None, float, UnsetType] = UNSET,
- read: typing.Union[None, float, UnsetType] = UNSET,
- write: typing.Union[None, float, UnsetType] = UNSET,
- pool: typing.Union[None, float, UnsetType] = UNSET,
+ connect: None | float | UnsetType = UNSET,
+ read: None | float | UnsetType = UNSET,
+ write: None | float | UnsetType = UNSET,
+ pool: None | float | UnsetType = UNSET,
) -> None:
if isinstance(timeout, Timeout):
# Passed as a single explicit Timeout.
@@ -256,7 +258,7 @@ class Timeout:
self.write = timeout if isinstance(write, UnsetType) else write
self.pool = timeout if isinstance(pool, UnsetType) else pool
- def as_dict(self) -> typing.Dict[str, typing.Optional[float]]:
+ def as_dict(self) -> dict[str, float | None]:
return {
"connect": self.connect,
"read": self.read,
@@ -300,9 +302,9 @@ class Limits:
def __init__(
self,
*,
- max_connections: typing.Optional[int] = None,
- max_keepalive_connections: typing.Optional[int] = None,
- keepalive_expiry: typing.Optional[float] = 5.0,
+ max_connections: int | None = None,
+ max_keepalive_connections: int | None = None,
+ keepalive_expiry: float | None = 5.0,
) -> None:
self.max_connections = max_connections
self.max_keepalive_connections = max_keepalive_connections
@@ -330,9 +332,9 @@ class Proxy:
self,
url: URLTypes,
*,
- ssl_context: typing.Optional[ssl.SSLContext] = None,
- auth: typing.Optional[typing.Tuple[str, str]] = None,
- headers: typing.Optional[HeaderTypes] = None,
+ ssl_context: ssl.SSLContext | None = None,
+ auth: tuple[str, str] | None = None,
+ headers: HeaderTypes | None = None,
) -> None:
url = URL(url)
headers = Headers(headers)
@@ -351,7 +353,7 @@ class Proxy:
self.ssl_context = ssl_context
@property
- def raw_auth(self) -> typing.Optional[typing.Tuple[bytes, bytes]]:
+ def raw_auth(self) -> tuple[bytes, bytes] | None:
# The proxy authentication as raw bytes.
return (
None
diff --git a/contrib/python/httpx/httpx/_content.py b/contrib/python/httpx/httpx/_content.py
index cd0d17f171..10b574bb3d 100644
--- a/contrib/python/httpx/httpx/_content.py
+++ b/contrib/python/httpx/httpx/_content.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import inspect
import warnings
from json import dumps as json_dumps
@@ -5,13 +7,9 @@ from typing import (
Any,
AsyncIterable,
AsyncIterator,
- Dict,
Iterable,
Iterator,
Mapping,
- Optional,
- Tuple,
- Union,
)
from urllib.parse import urlencode
@@ -105,8 +103,8 @@ class UnattachedStream(AsyncByteStream, SyncByteStream):
def encode_content(
- content: Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]],
-) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
+ content: str | bytes | Iterable[bytes] | AsyncIterable[bytes],
+) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]:
if isinstance(content, (bytes, str)):
body = content.encode("utf-8") if isinstance(content, str) else content
content_length = len(body)
@@ -135,7 +133,7 @@ def encode_content(
def encode_urlencoded_data(
data: RequestData,
-) -> Tuple[Dict[str, str], ByteStream]:
+) -> tuple[dict[str, str], ByteStream]:
plain_data = []
for key, value in data.items():
if isinstance(value, (list, tuple)):
@@ -150,14 +148,14 @@ def encode_urlencoded_data(
def encode_multipart_data(
- data: RequestData, files: RequestFiles, boundary: Optional[bytes]
-) -> Tuple[Dict[str, str], MultipartStream]:
+ data: RequestData, files: RequestFiles, boundary: bytes | None
+) -> tuple[dict[str, str], MultipartStream]:
multipart = MultipartStream(data=data, files=files, boundary=boundary)
headers = multipart.get_headers()
return headers, multipart
-def encode_text(text: str) -> Tuple[Dict[str, str], ByteStream]:
+def encode_text(text: str) -> tuple[dict[str, str], ByteStream]:
body = text.encode("utf-8")
content_length = str(len(body))
content_type = "text/plain; charset=utf-8"
@@ -165,7 +163,7 @@ def encode_text(text: str) -> Tuple[Dict[str, str], ByteStream]:
return headers, ByteStream(body)
-def encode_html(html: str) -> Tuple[Dict[str, str], ByteStream]:
+def encode_html(html: str) -> tuple[dict[str, str], ByteStream]:
body = html.encode("utf-8")
content_length = str(len(body))
content_type = "text/html; charset=utf-8"
@@ -173,7 +171,7 @@ def encode_html(html: str) -> Tuple[Dict[str, str], ByteStream]:
return headers, ByteStream(body)
-def encode_json(json: Any) -> Tuple[Dict[str, str], ByteStream]:
+def encode_json(json: Any) -> tuple[dict[str, str], ByteStream]:
body = json_dumps(json).encode("utf-8")
content_length = str(len(body))
content_type = "application/json"
@@ -182,12 +180,12 @@ def encode_json(json: Any) -> Tuple[Dict[str, str], ByteStream]:
def encode_request(
- content: Optional[RequestContent] = None,
- data: Optional[RequestData] = None,
- files: Optional[RequestFiles] = None,
- json: Optional[Any] = None,
- boundary: Optional[bytes] = None,
-) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: Any | None = None,
+ boundary: bytes | None = None,
+) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]:
"""
Handles encoding the given `content`, `data`, `files`, and `json`,
returning a two-tuple of (<headers>, <stream>).
@@ -217,11 +215,11 @@ def encode_request(
def encode_response(
- content: Optional[ResponseContent] = None,
- text: Optional[str] = None,
- html: Optional[str] = None,
- json: Optional[Any] = None,
-) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
+ content: ResponseContent | None = None,
+ text: str | None = None,
+ html: str | None = None,
+ json: Any | None = None,
+) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]:
"""
Handles encoding the given `content`, returning a two-tuple of
(<headers>, <stream>).
diff --git a/contrib/python/httpx/httpx/_decoders.py b/contrib/python/httpx/httpx/_decoders.py
index 3f507c8e04..31c72c7f7a 100644
--- a/contrib/python/httpx/httpx/_decoders.py
+++ b/contrib/python/httpx/httpx/_decoders.py
@@ -3,6 +3,8 @@ Handlers for Content-Encoding.
See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
"""
+from __future__ import annotations
+
import codecs
import io
import typing
@@ -167,11 +169,11 @@ class ByteChunker:
Handles returning byte content in fixed-size chunks.
"""
- def __init__(self, chunk_size: typing.Optional[int] = None) -> None:
+ def __init__(self, chunk_size: int | None = None) -> None:
self._buffer = io.BytesIO()
self._chunk_size = chunk_size
- def decode(self, content: bytes) -> typing.List[bytes]:
+ def decode(self, content: bytes) -> list[bytes]:
if self._chunk_size is None:
return [content] if content else []
@@ -194,7 +196,7 @@ class ByteChunker:
else:
return []
- def flush(self) -> typing.List[bytes]:
+ def flush(self) -> list[bytes]:
value = self._buffer.getvalue()
self._buffer.seek(0)
self._buffer.truncate()
@@ -206,11 +208,11 @@ class TextChunker:
Handles returning text content in fixed-size chunks.
"""
- def __init__(self, chunk_size: typing.Optional[int] = None) -> None:
+ def __init__(self, chunk_size: int | None = None) -> None:
self._buffer = io.StringIO()
self._chunk_size = chunk_size
- def decode(self, content: str) -> typing.List[str]:
+ def decode(self, content: str) -> list[str]:
if self._chunk_size is None:
return [content] if content else []
@@ -233,7 +235,7 @@ class TextChunker:
else:
return []
- def flush(self) -> typing.List[str]:
+ def flush(self) -> list[str]:
value = self._buffer.getvalue()
self._buffer.seek(0)
self._buffer.truncate()
@@ -264,10 +266,10 @@ class LineDecoder:
"""
def __init__(self) -> None:
- self.buffer: typing.List[str] = []
+ self.buffer: list[str] = []
self.trailing_cr: bool = False
- def decode(self, text: str) -> typing.List[str]:
+ def decode(self, text: str) -> list[str]:
# See https://docs.python.org/3/library/stdtypes.html#str.splitlines
NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029"
@@ -305,7 +307,7 @@ class LineDecoder:
return lines
- def flush(self) -> typing.List[str]:
+ def flush(self) -> list[str]:
if not self.buffer and not self.trailing_cr:
return []
diff --git a/contrib/python/httpx/httpx/_exceptions.py b/contrib/python/httpx/httpx/_exceptions.py
index 123692955b..11424621c0 100644
--- a/contrib/python/httpx/httpx/_exceptions.py
+++ b/contrib/python/httpx/httpx/_exceptions.py
@@ -30,6 +30,8 @@ Our exception hierarchy:
x ResponseNotRead
x RequestNotRead
"""
+from __future__ import annotations
+
import contextlib
import typing
@@ -57,16 +59,16 @@ class HTTPError(Exception):
def __init__(self, message: str) -> None:
super().__init__(message)
- self._request: typing.Optional["Request"] = None
+ self._request: Request | None = None
@property
- def request(self) -> "Request":
+ def request(self) -> Request:
if self._request is None:
raise RuntimeError("The .request property has not been set.")
return self._request
@request.setter
- def request(self, request: "Request") -> None:
+ def request(self, request: Request) -> None:
self._request = request
@@ -75,9 +77,7 @@ class RequestError(HTTPError):
Base class for all exceptions that may occur when issuing a `.request()`.
"""
- def __init__(
- self, message: str, *, request: typing.Optional["Request"] = None
- ) -> None:
+ def __init__(self, message: str, *, request: Request | None = None) -> None:
super().__init__(message)
# At the point an exception is raised we won't typically have a request
# instance to associate it with.
@@ -230,9 +230,7 @@ class HTTPStatusError(HTTPError):
May be raised when calling `response.raise_for_status()`
"""
- def __init__(
- self, message: str, *, request: "Request", response: "Response"
- ) -> None:
+ def __init__(self, message: str, *, request: Request, response: Response) -> None:
super().__init__(message)
self.request = request
self.response = response
@@ -335,7 +333,7 @@ class RequestNotRead(StreamError):
@contextlib.contextmanager
def request_context(
- request: typing.Optional["Request"] = None,
+ request: Request | None = None,
) -> typing.Iterator[None]:
"""
A context manager that can be used to attach the given request context
diff --git a/contrib/python/httpx/httpx/_main.py b/contrib/python/httpx/httpx/_main.py
index adb57d5fc0..72657f8ca3 100644
--- a/contrib/python/httpx/httpx/_main.py
+++ b/contrib/python/httpx/httpx/_main.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import functools
import json
import sys
@@ -125,8 +127,8 @@ def format_request_headers(request: httpcore.Request, http2: bool = False) -> st
def format_response_headers(
http_version: bytes,
status: int,
- reason_phrase: typing.Optional[bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
+ reason_phrase: bytes | None,
+ headers: list[tuple[bytes, bytes]],
) -> str:
version = http_version.decode("ascii")
reason = (
@@ -152,8 +154,8 @@ def print_request_headers(request: httpcore.Request, http2: bool = False) -> Non
def print_response_headers(
http_version: bytes,
status: int,
- reason_phrase: typing.Optional[bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
+ reason_phrase: bytes | None,
+ headers: list[tuple[bytes, bytes]],
) -> None:
console = rich.console.Console()
http_text = format_response_headers(http_version, status, reason_phrase, headers)
@@ -268,7 +270,7 @@ def download_response(response: Response, download: typing.BinaryIO) -> None:
def validate_json(
ctx: click.Context,
- param: typing.Union[click.Option, click.Parameter],
+ param: click.Option | click.Parameter,
value: typing.Any,
) -> typing.Any:
if value is None:
@@ -282,7 +284,7 @@ def validate_json(
def validate_auth(
ctx: click.Context,
- param: typing.Union[click.Option, click.Parameter],
+ param: click.Option | click.Parameter,
value: typing.Any,
) -> typing.Any:
if value == (None, None):
@@ -296,7 +298,7 @@ def validate_auth(
def handle_help(
ctx: click.Context,
- param: typing.Union[click.Option, click.Parameter],
+ param: click.Option | click.Parameter,
value: typing.Any,
) -> None:
if not value or ctx.resilient_parsing:
@@ -448,20 +450,20 @@ def handle_help(
def main(
url: str,
method: str,
- params: typing.List[typing.Tuple[str, str]],
+ params: list[tuple[str, str]],
content: str,
- data: typing.List[typing.Tuple[str, str]],
- files: typing.List[typing.Tuple[str, click.File]],
+ data: list[tuple[str, str]],
+ files: list[tuple[str, click.File]],
json: str,
- headers: typing.List[typing.Tuple[str, str]],
- cookies: typing.List[typing.Tuple[str, str]],
- auth: typing.Optional[typing.Tuple[str, str]],
+ headers: list[tuple[str, str]],
+ cookies: list[tuple[str, str]],
+ auth: tuple[str, str] | None,
proxy: str,
timeout: float,
follow_redirects: bool,
verify: bool,
http2: bool,
- download: typing.Optional[typing.BinaryIO],
+ download: typing.BinaryIO | None,
verbose: bool,
) -> None:
"""
diff --git a/contrib/python/httpx/httpx/_models.py b/contrib/python/httpx/httpx/_models.py
index b8617cdab5..cd76705f1a 100644
--- a/contrib/python/httpx/httpx/_models.py
+++ b/contrib/python/httpx/httpx/_models.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import datetime
import email.message
import json as jsonlib
@@ -59,8 +61,8 @@ class Headers(typing.MutableMapping[str, str]):
def __init__(
self,
- headers: typing.Optional[HeaderTypes] = None,
- encoding: typing.Optional[str] = None,
+ headers: HeaderTypes | None = None,
+ encoding: str | None = None,
) -> None:
if headers is None:
self._list = [] # type: typing.List[typing.Tuple[bytes, bytes, bytes]]
@@ -117,7 +119,7 @@ class Headers(typing.MutableMapping[str, str]):
self._encoding = value
@property
- def raw(self) -> typing.List[typing.Tuple[bytes, bytes]]:
+ def raw(self) -> list[tuple[bytes, bytes]]:
"""
Returns a list of the raw header items, as byte pairs.
"""
@@ -127,7 +129,7 @@ class Headers(typing.MutableMapping[str, str]):
return {key.decode(self.encoding): None for _, key, value in self._list}.keys()
def values(self) -> typing.ValuesView[str]:
- values_dict: typing.Dict[str, str] = {}
+ values_dict: dict[str, str] = {}
for _, key, value in self._list:
str_key = key.decode(self.encoding)
str_value = value.decode(self.encoding)
@@ -142,7 +144,7 @@ class Headers(typing.MutableMapping[str, str]):
Return `(key, value)` items of headers. Concatenate headers
into a single comma separated value when a key occurs multiple times.
"""
- values_dict: typing.Dict[str, str] = {}
+ values_dict: dict[str, str] = {}
for _, key, value in self._list:
str_key = key.decode(self.encoding)
str_value = value.decode(self.encoding)
@@ -152,7 +154,7 @@ class Headers(typing.MutableMapping[str, str]):
values_dict[str_key] = str_value
return values_dict.items()
- def multi_items(self) -> typing.List[typing.Tuple[str, str]]:
+ def multi_items(self) -> list[tuple[str, str]]:
"""
Return a list of `(key, value)` pairs of headers. Allow multiple
occurrences of the same key without concatenating into a single
@@ -173,7 +175,7 @@ class Headers(typing.MutableMapping[str, str]):
except KeyError:
return default
- def get_list(self, key: str, split_commas: bool = False) -> typing.List[str]:
+ def get_list(self, key: str, split_commas: bool = False) -> list[str]:
"""
Return a list of all header values for a given key.
If `split_commas=True` is passed, then any comma separated header
@@ -195,14 +197,14 @@ class Headers(typing.MutableMapping[str, str]):
split_values.extend([item.strip() for item in value.split(",")])
return split_values
- def update(self, headers: typing.Optional[HeaderTypes] = None) -> None: # type: ignore
+ def update(self, headers: HeaderTypes | None = None) -> None: # type: ignore
headers = Headers(headers)
for key in headers.keys():
if key in self:
self.pop(key)
self._list.extend(headers._list)
- def copy(self) -> "Headers":
+ def copy(self) -> Headers:
return Headers(self, encoding=self.encoding)
def __getitem__(self, key: str) -> str:
@@ -306,18 +308,18 @@ class Headers(typing.MutableMapping[str, str]):
class Request:
def __init__(
self,
- method: typing.Union[str, bytes],
- url: typing.Union["URL", str],
+ method: str | bytes,
+ url: URL | str,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- stream: typing.Union[SyncByteStream, AsyncByteStream, None] = None,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ stream: SyncByteStream | AsyncByteStream | None = None,
+ extensions: RequestExtensions | None = None,
) -> None:
self.method = (
method.decode("ascii").upper()
@@ -334,7 +336,7 @@ class Request:
Cookies(cookies).set_cookie_header(self)
if stream is None:
- content_type: typing.Optional[str] = self.headers.get("content-type")
+ content_type: str | None = self.headers.get("content-type")
headers, stream = encode_request(
content=content,
data=data,
@@ -368,14 +370,14 @@ class Request:
# * Creating request instances on the *server-side* of the transport API.
self.stream = stream
- def _prepare(self, default_headers: typing.Dict[str, str]) -> None:
+ def _prepare(self, default_headers: dict[str, str]) -> None:
for key, value in default_headers.items():
# Ignore Transfer-Encoding if the Content-Length has been set explicitly.
if key.lower() == "transfer-encoding" and "Content-Length" in self.headers:
continue
self.headers.setdefault(key, value)
- auto_headers: typing.List[typing.Tuple[bytes, bytes]] = []
+ auto_headers: list[tuple[bytes, bytes]] = []
has_host = "Host" in self.headers
has_content_length = (
@@ -428,14 +430,14 @@ class Request:
url = str(self.url)
return f"<{class_name}({self.method!r}, {url!r})>"
- def __getstate__(self) -> typing.Dict[str, typing.Any]:
+ def __getstate__(self) -> dict[str, typing.Any]:
return {
name: value
for name, value in self.__dict__.items()
if name not in ["extensions", "stream"]
}
- def __setstate__(self, state: typing.Dict[str, typing.Any]) -> None:
+ def __setstate__(self, state: dict[str, typing.Any]) -> None:
for name, value in state.items():
setattr(self, name, value)
self.extensions = {}
@@ -447,25 +449,25 @@ class Response:
self,
status_code: int,
*,
- headers: typing.Optional[HeaderTypes] = None,
- content: typing.Optional[ResponseContent] = None,
- text: typing.Optional[str] = None,
- html: typing.Optional[str] = None,
+ headers: HeaderTypes | None = None,
+ content: ResponseContent | None = None,
+ text: str | None = None,
+ html: str | None = None,
json: typing.Any = None,
- stream: typing.Union[SyncByteStream, AsyncByteStream, None] = None,
- request: typing.Optional[Request] = None,
- extensions: typing.Optional[ResponseExtensions] = None,
- history: typing.Optional[typing.List["Response"]] = None,
- default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8",
+ stream: SyncByteStream | AsyncByteStream | None = None,
+ request: Request | None = None,
+ extensions: ResponseExtensions | None = None,
+ history: list[Response] | None = None,
+ default_encoding: str | typing.Callable[[bytes], str] = "utf-8",
) -> None:
self.status_code = status_code
self.headers = Headers(headers)
- self._request: typing.Optional[Request] = request
+ self._request: Request | None = request
# When follow_redirects=False and a redirect is received,
# the client will set `response.next_request`.
- self.next_request: typing.Optional[Request] = None
+ self.next_request: Request | None = None
self.extensions: ResponseExtensions = {} if extensions is None else extensions
self.history = [] if history is None else list(history)
@@ -498,7 +500,7 @@ class Response:
self._num_bytes_downloaded = 0
- def _prepare(self, default_headers: typing.Dict[str, str]) -> None:
+ def _prepare(self, default_headers: dict[str, str]) -> None:
for key, value in default_headers.items():
# Ignore Transfer-Encoding if the Content-Length has been set explicitly.
if key.lower() == "transfer-encoding" and "content-length" in self.headers:
@@ -580,7 +582,7 @@ class Response:
return self._text
@property
- def encoding(self) -> typing.Optional[str]:
+ def encoding(self) -> str | None:
"""
Return an encoding to use for decoding the byte content into text.
The priority for determining this is given by...
@@ -616,7 +618,7 @@ class Response:
self._encoding = value
@property
- def charset_encoding(self) -> typing.Optional[str]:
+ def charset_encoding(self) -> str | None:
"""
Return the encoding, as specified by the Content-Type header.
"""
@@ -632,7 +634,7 @@ class Response:
content, depending on the Content-Encoding used in the response.
"""
if not hasattr(self, "_decoder"):
- decoders: typing.List[ContentDecoder] = []
+ decoders: list[ContentDecoder] = []
values = self.headers.get_list("content-encoding", split_commas=True)
for value in values:
value = value.strip().lower()
@@ -721,7 +723,7 @@ class Response:
and "Location" in self.headers
)
- def raise_for_status(self) -> "Response":
+ def raise_for_status(self) -> Response:
"""
Raise the `HTTPStatusError` if one occurred.
"""
@@ -762,25 +764,25 @@ class Response:
return jsonlib.loads(self.content, **kwargs)
@property
- def cookies(self) -> "Cookies":
+ def cookies(self) -> Cookies:
if not hasattr(self, "_cookies"):
self._cookies = Cookies()
self._cookies.extract_cookies(self)
return self._cookies
@property
- def links(self) -> typing.Dict[typing.Optional[str], typing.Dict[str, str]]:
+ def links(self) -> dict[str | None, dict[str, str]]:
"""
Returns the parsed header links of the response, if any
"""
header = self.headers.get("link")
- ldict = {}
- if header:
- links = parse_header_links(header)
- for link in links:
- key = link.get("rel") or link.get("url")
- ldict[key] = link
- return ldict
+ if header is None:
+ return {}
+
+ return {
+ (link.get("rel") or link.get("url")): link
+ for link in parse_header_links(header)
+ }
@property
def num_bytes_downloaded(self) -> int:
@@ -789,14 +791,14 @@ class Response:
def __repr__(self) -> str:
return f"<Response [{self.status_code} {self.reason_phrase}]>"
- def __getstate__(self) -> typing.Dict[str, typing.Any]:
+ def __getstate__(self) -> dict[str, typing.Any]:
return {
name: value
for name, value in self.__dict__.items()
if name not in ["extensions", "stream", "is_closed", "_decoder"]
}
- def __setstate__(self, state: typing.Dict[str, typing.Any]) -> None:
+ def __setstate__(self, state: dict[str, typing.Any]) -> None:
for name, value in state.items():
setattr(self, name, value)
self.is_closed = True
@@ -811,9 +813,7 @@ class Response:
self._content = b"".join(self.iter_bytes())
return self._content
- def iter_bytes(
- self, chunk_size: typing.Optional[int] = None
- ) -> typing.Iterator[bytes]:
+ def iter_bytes(self, chunk_size: int | None = None) -> typing.Iterator[bytes]:
"""
A byte-iterator over the decoded response content.
This allows us to handle gzip, deflate, and brotli encoded responses.
@@ -836,9 +836,7 @@ class Response:
for chunk in chunker.flush():
yield chunk
- def iter_text(
- self, chunk_size: typing.Optional[int] = None
- ) -> typing.Iterator[str]:
+ def iter_text(self, chunk_size: int | None = None) -> typing.Iterator[str]:
"""
A str-iterator over the decoded response content
that handles both gzip, deflate, etc but also detects the content's
@@ -866,9 +864,7 @@ class Response:
for line in decoder.flush():
yield line
- def iter_raw(
- self, chunk_size: typing.Optional[int] = None
- ) -> typing.Iterator[bytes]:
+ def iter_raw(self, chunk_size: int | None = None) -> typing.Iterator[bytes]:
"""
A byte-iterator over the raw response content.
"""
@@ -916,7 +912,7 @@ class Response:
return self._content
async def aiter_bytes(
- self, chunk_size: typing.Optional[int] = None
+ self, chunk_size: int | None = None
) -> typing.AsyncIterator[bytes]:
"""
A byte-iterator over the decoded response content.
@@ -941,7 +937,7 @@ class Response:
yield chunk
async def aiter_text(
- self, chunk_size: typing.Optional[int] = None
+ self, chunk_size: int | None = None
) -> typing.AsyncIterator[str]:
"""
A str-iterator over the decoded response content
@@ -971,7 +967,7 @@ class Response:
yield line
async def aiter_raw(
- self, chunk_size: typing.Optional[int] = None
+ self, chunk_size: int | None = None
) -> typing.AsyncIterator[bytes]:
"""
A byte-iterator over the raw response content.
@@ -1017,7 +1013,7 @@ class Cookies(typing.MutableMapping[str, str]):
HTTP Cookies, as a mutable mapping.
"""
- def __init__(self, cookies: typing.Optional[CookieTypes] = None) -> None:
+ def __init__(self, cookies: CookieTypes | None = None) -> None:
if cookies is None or isinstance(cookies, dict):
self.jar = CookieJar()
if isinstance(cookies, dict):
@@ -1079,10 +1075,10 @@ class Cookies(typing.MutableMapping[str, str]):
def get( # type: ignore
self,
name: str,
- default: typing.Optional[str] = None,
- domain: typing.Optional[str] = None,
- path: typing.Optional[str] = None,
- ) -> typing.Optional[str]:
+ default: str | None = None,
+ domain: str | None = None,
+ path: str | None = None,
+ ) -> str | None:
"""
Get a cookie by name. May optionally include domain and path
in order to specify exactly which cookie to retrieve.
@@ -1104,8 +1100,8 @@ class Cookies(typing.MutableMapping[str, str]):
def delete(
self,
name: str,
- domain: typing.Optional[str] = None,
- path: typing.Optional[str] = None,
+ domain: str | None = None,
+ path: str | None = None,
) -> None:
"""
Delete a cookie by name. May optionally include domain and path
@@ -1125,9 +1121,7 @@ class Cookies(typing.MutableMapping[str, str]):
for cookie in remove:
self.jar.clear(cookie.domain, cookie.path, cookie.name)
- def clear(
- self, domain: typing.Optional[str] = None, path: typing.Optional[str] = None
- ) -> None:
+ def clear(self, domain: str | None = None, path: str | None = None) -> None:
"""
Delete all cookies. Optionally include a domain and path in
order to only delete a subset of all the cookies.
@@ -1140,7 +1134,7 @@ class Cookies(typing.MutableMapping[str, str]):
args.append(path)
self.jar.clear(*args)
- def update(self, cookies: typing.Optional[CookieTypes] = None) -> None: # type: ignore
+ def update(self, cookies: CookieTypes | None = None) -> None: # type: ignore
cookies = Cookies(cookies)
for cookie in cookies.jar:
self.jar.set_cookie(cookie)
diff --git a/contrib/python/httpx/httpx/_multipart.py b/contrib/python/httpx/httpx/_multipart.py
index 5122d5114f..8edb622778 100644
--- a/contrib/python/httpx/httpx/_multipart.py
+++ b/contrib/python/httpx/httpx/_multipart.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import io
import os
import typing
@@ -21,8 +23,8 @@ from ._utils import (
def get_multipart_boundary_from_content_type(
- content_type: typing.Optional[bytes],
-) -> typing.Optional[bytes]:
+ content_type: bytes | None,
+) -> bytes | None:
if not content_type or not content_type.startswith(b"multipart/form-data"):
return None
# parse boundary according to
@@ -39,9 +41,7 @@ class DataField:
A single form field item, within a multipart form field.
"""
- def __init__(
- self, name: str, value: typing.Union[str, bytes, int, float, None]
- ) -> None:
+ def __init__(self, name: str, value: str | bytes | int | float | None) -> None:
if not isinstance(name, str):
raise TypeError(
f"Invalid type for name. Expected str, got {type(name)}: {name!r}"
@@ -52,7 +52,7 @@ class DataField:
f" got {type(value)}: {value!r}"
)
self.name = name
- self.value: typing.Union[str, bytes] = (
+ self.value: str | bytes = (
value if isinstance(value, bytes) else primitive_value_to_str(value)
)
@@ -93,8 +93,8 @@ class FileField:
fileobj: FileContent
- headers: typing.Dict[str, str] = {}
- content_type: typing.Optional[str] = None
+ headers: dict[str, str] = {}
+ content_type: str | None = None
# This large tuple based API largely mirror's requests' API
# It would be good to think of better APIs for this that we could
@@ -104,9 +104,9 @@ class FileField:
if len(value) == 2:
# neither the 3rd parameter (content_type) nor the 4th (headers)
# was included
- filename, fileobj = value # type: ignore
+ filename, fileobj = value
elif len(value) == 3:
- filename, fileobj, content_type = value # type: ignore
+ filename, fileobj, content_type = value
else:
# all 4 parameters included
filename, fileobj, content_type, headers = value # type: ignore
@@ -137,7 +137,7 @@ class FileField:
self.file = fileobj
self.headers = headers
- def get_length(self) -> typing.Optional[int]:
+ def get_length(self) -> int | None:
headers = self.render_headers()
if isinstance(self.file, (str, bytes)):
@@ -199,7 +199,7 @@ class MultipartStream(SyncByteStream, AsyncByteStream):
self,
data: RequestData,
files: RequestFiles,
- boundary: typing.Optional[bytes] = None,
+ boundary: bytes | None = None,
) -> None:
if boundary is None:
boundary = os.urandom(16).hex().encode("ascii")
@@ -212,7 +212,7 @@ class MultipartStream(SyncByteStream, AsyncByteStream):
def _iter_fields(
self, data: RequestData, files: RequestFiles
- ) -> typing.Iterator[typing.Union[FileField, DataField]]:
+ ) -> typing.Iterator[FileField | DataField]:
for name, value in data.items():
if isinstance(value, (tuple, list)):
for item in value:
@@ -231,7 +231,7 @@ class MultipartStream(SyncByteStream, AsyncByteStream):
yield b"\r\n"
yield b"--%s--\r\n" % self.boundary
- def get_content_length(self) -> typing.Optional[int]:
+ def get_content_length(self) -> int | None:
"""
Return the length of the multipart encoded content, or `None` if
any of the files have a length that cannot be determined upfront.
@@ -253,7 +253,7 @@ class MultipartStream(SyncByteStream, AsyncByteStream):
# Content stream interface.
- def get_headers(self) -> typing.Dict[str, str]:
+ def get_headers(self) -> dict[str, str]:
content_length = self.get_content_length()
content_type = self.content_type
if content_length is None:
diff --git a/contrib/python/httpx/httpx/_status_codes.py b/contrib/python/httpx/httpx/_status_codes.py
index 671c30e1b8..4cde4e6845 100644
--- a/contrib/python/httpx/httpx/_status_codes.py
+++ b/contrib/python/httpx/httpx/_status_codes.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
from enum import IntEnum
@@ -21,7 +23,7 @@ class codes(IntEnum):
* RFC 8470: Using Early Data in HTTP
"""
- def __new__(cls, value: int, phrase: str = "") -> "codes":
+ def __new__(cls, value: int, phrase: str = "") -> codes:
obj = int.__new__(cls, value)
obj._value_ = value
diff --git a/contrib/python/httpx/httpx/_transports/asgi.py b/contrib/python/httpx/httpx/_transports/asgi.py
index 08cd392f75..9543a12861 100644
--- a/contrib/python/httpx/httpx/_transports/asgi.py
+++ b/contrib/python/httpx/httpx/_transports/asgi.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import typing
import sniffio
@@ -24,7 +26,7 @@ _ASGIApp = typing.Callable[
]
-def create_event() -> "Event":
+def create_event() -> Event:
if sniffio.current_async_library() == "trio":
import trio
@@ -36,7 +38,7 @@ def create_event() -> "Event":
class ASGIResponseStream(AsyncByteStream):
- def __init__(self, body: typing.List[bytes]) -> None:
+ def __init__(self, body: list[bytes]) -> None:
self._body = body
async def __aiter__(self) -> typing.AsyncIterator[bytes]:
@@ -81,7 +83,7 @@ class ASGITransport(AsyncBaseTransport):
app: _ASGIApp,
raise_app_exceptions: bool = True,
root_path: str = "",
- client: typing.Tuple[str, int] = ("127.0.0.1", 123),
+ client: tuple[str, int] = ("127.0.0.1", 123),
) -> None:
self.app = app
self.raise_app_exceptions = raise_app_exceptions
@@ -123,7 +125,7 @@ class ASGITransport(AsyncBaseTransport):
# ASGI callables.
- async def receive() -> typing.Dict[str, typing.Any]:
+ async def receive() -> dict[str, typing.Any]:
nonlocal request_complete
if request_complete:
@@ -137,7 +139,7 @@ class ASGITransport(AsyncBaseTransport):
return {"type": "http.request", "body": b"", "more_body": False}
return {"type": "http.request", "body": body, "more_body": True}
- async def send(message: typing.Dict[str, typing.Any]) -> None:
+ async def send(message: dict[str, typing.Any]) -> None:
nonlocal status_code, response_headers, response_started
if message["type"] == "http.response.start":
diff --git a/contrib/python/httpx/httpx/_transports/base.py b/contrib/python/httpx/httpx/_transports/base.py
index f6fdfe6943..8b6dc3c239 100644
--- a/contrib/python/httpx/httpx/_transports/base.py
+++ b/contrib/python/httpx/httpx/_transports/base.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import typing
from types import TracebackType
@@ -13,9 +15,9 @@ class BaseTransport:
def __exit__(
self,
- exc_type: typing.Optional[typing.Type[BaseException]] = None,
- exc_value: typing.Optional[BaseException] = None,
- traceback: typing.Optional[TracebackType] = None,
+ exc_type: type[BaseException] | None = None,
+ exc_value: BaseException | None = None,
+ traceback: TracebackType | None = None,
) -> None:
self.close()
@@ -64,9 +66,9 @@ class AsyncBaseTransport:
async def __aexit__(
self,
- exc_type: typing.Optional[typing.Type[BaseException]] = None,
- exc_value: typing.Optional[BaseException] = None,
- traceback: typing.Optional[TracebackType] = None,
+ exc_type: type[BaseException] | None = None,
+ exc_value: BaseException | None = None,
+ traceback: TracebackType | None = None,
) -> None:
await self.aclose()
diff --git a/contrib/python/httpx/httpx/_transports/default.py b/contrib/python/httpx/httpx/_transports/default.py
index 14a087389a..14476a3ce3 100644
--- a/contrib/python/httpx/httpx/_transports/default.py
+++ b/contrib/python/httpx/httpx/_transports/default.py
@@ -23,6 +23,8 @@ client = httpx.Client(transport=transport)
transport = httpx.HTTPTransport(uds="socket.uds")
client = httpx.Client(transport=transport)
"""
+from __future__ import annotations
+
import contextlib
import typing
from types import TracebackType
@@ -120,16 +122,16 @@ class HTTPTransport(BaseTransport):
def __init__(
self,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
trust_env: bool = True,
- proxy: typing.Optional[ProxyTypes] = None,
- uds: typing.Optional[str] = None,
- local_address: typing.Optional[str] = None,
+ proxy: ProxyTypes | None = None,
+ uds: str | None = None,
+ local_address: str | None = None,
retries: int = 0,
- socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
+ socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
) -> None:
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
@@ -202,9 +204,9 @@ class HTTPTransport(BaseTransport):
def __exit__(
self,
- exc_type: typing.Optional[typing.Type[BaseException]] = None,
- exc_value: typing.Optional[BaseException] = None,
- traceback: typing.Optional[TracebackType] = None,
+ exc_type: type[BaseException] | None = None,
+ exc_value: BaseException | None = None,
+ traceback: TracebackType | None = None,
) -> None:
with map_httpcore_exceptions():
self._pool.__exit__(exc_type, exc_value, traceback)
@@ -261,16 +263,16 @@ class AsyncHTTPTransport(AsyncBaseTransport):
def __init__(
self,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
trust_env: bool = True,
- proxy: typing.Optional[ProxyTypes] = None,
- uds: typing.Optional[str] = None,
- local_address: typing.Optional[str] = None,
+ proxy: ProxyTypes | None = None,
+ uds: str | None = None,
+ local_address: str | None = None,
retries: int = 0,
- socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
+ socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
) -> None:
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
@@ -342,9 +344,9 @@ class AsyncHTTPTransport(AsyncBaseTransport):
async def __aexit__(
self,
- exc_type: typing.Optional[typing.Type[BaseException]] = None,
- exc_value: typing.Optional[BaseException] = None,
- traceback: typing.Optional[TracebackType] = None,
+ exc_type: type[BaseException] | None = None,
+ exc_value: BaseException | None = None,
+ traceback: TracebackType | None = None,
) -> None:
with map_httpcore_exceptions():
await self._pool.__aexit__(exc_type, exc_value, traceback)
diff --git a/contrib/python/httpx/httpx/_transports/mock.py b/contrib/python/httpx/httpx/_transports/mock.py
index 82043da2d9..5abea83731 100644
--- a/contrib/python/httpx/httpx/_transports/mock.py
+++ b/contrib/python/httpx/httpx/_transports/mock.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import typing
from .._models import Request, Response
@@ -8,7 +10,7 @@ AsyncHandler = typing.Callable[[Request], typing.Coroutine[None, None, Response]
class MockTransport(AsyncBaseTransport, BaseTransport):
- def __init__(self, handler: typing.Union[SyncHandler, AsyncHandler]) -> None:
+ def __init__(self, handler: SyncHandler | AsyncHandler) -> None:
self.handler = handler
def handle_request(
diff --git a/contrib/python/httpx/httpx/_transports/wsgi.py b/contrib/python/httpx/httpx/_transports/wsgi.py
index a23d42c414..cd03a9417b 100644
--- a/contrib/python/httpx/httpx/_transports/wsgi.py
+++ b/contrib/python/httpx/httpx/_transports/wsgi.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import io
import itertools
import sys
@@ -71,11 +73,11 @@ class WSGITransport(BaseTransport):
def __init__(
self,
- app: "WSGIApplication",
+ app: WSGIApplication,
raise_app_exceptions: bool = True,
script_name: str = "",
remote_addr: str = "127.0.0.1",
- wsgi_errors: typing.Optional[typing.TextIO] = None,
+ wsgi_errors: typing.TextIO | None = None,
) -> None:
self.app = app
self.raise_app_exceptions = raise_app_exceptions
@@ -117,8 +119,8 @@ class WSGITransport(BaseTransport):
def start_response(
status: str,
- response_headers: typing.List[typing.Tuple[str, str]],
- exc_info: typing.Optional["OptExcInfo"] = None,
+ response_headers: list[tuple[str, str]],
+ exc_info: OptExcInfo | None = None,
) -> typing.Callable[[bytes], typing.Any]:
nonlocal seen_status, seen_response_headers, seen_exc_info
seen_status = status
diff --git a/contrib/python/httpx/httpx/_urlparse.py b/contrib/python/httpx/httpx/_urlparse.py
index 07bbea9070..6a4b55b38c 100644
--- a/contrib/python/httpx/httpx/_urlparse.py
+++ b/contrib/python/httpx/httpx/_urlparse.py
@@ -15,6 +15,8 @@ Previously we relied on the excellent `rfc3986` package to handle URL parsing an
validation, but this module provides a simpler alternative, with less indirection
required.
"""
+from __future__ import annotations
+
import ipaddress
import re
import typing
@@ -95,10 +97,10 @@ class ParseResult(typing.NamedTuple):
scheme: str
userinfo: str
host: str
- port: typing.Optional[int]
+ port: int | None
path: str
- query: typing.Optional[str]
- fragment: typing.Optional[str]
+ query: str | None
+ fragment: str | None
@property
def authority(self) -> str:
@@ -119,7 +121,7 @@ class ParseResult(typing.NamedTuple):
]
)
- def copy_with(self, **kwargs: typing.Optional[str]) -> "ParseResult":
+ def copy_with(self, **kwargs: str | None) -> ParseResult:
if not kwargs:
return self
@@ -146,7 +148,7 @@ class ParseResult(typing.NamedTuple):
)
-def urlparse(url: str = "", **kwargs: typing.Optional[str]) -> ParseResult:
+def urlparse(url: str = "", **kwargs: str | None) -> ParseResult:
# Initial basic checks on allowable URLs.
# ---------------------------------------
@@ -243,7 +245,7 @@ def urlparse(url: str = "", **kwargs: typing.Optional[str]) -> ParseResult:
parsed_scheme: str = scheme.lower()
parsed_userinfo: str = quote(userinfo, safe=SUB_DELIMS + ":")
parsed_host: str = encode_host(host)
- parsed_port: typing.Optional[int] = normalize_port(port, scheme)
+ parsed_port: int | None = normalize_port(port, scheme)
has_scheme = parsed_scheme != ""
has_authority = (
@@ -260,11 +262,11 @@ def urlparse(url: str = "", **kwargs: typing.Optional[str]) -> ParseResult:
# For 'path' we need to drop ? and # from the GEN_DELIMS set.
parsed_path: str = quote(path, safe=SUB_DELIMS + ":/[]@")
# For 'query' we need to drop '#' from the GEN_DELIMS set.
- parsed_query: typing.Optional[str] = (
+ parsed_query: str | None = (
None if query is None else quote(query, safe=SUB_DELIMS + ":/?[]@")
)
# For 'fragment' we can include all of the GEN_DELIMS set.
- parsed_fragment: typing.Optional[str] = (
+ parsed_fragment: str | None = (
None if fragment is None else quote(fragment, safe=SUB_DELIMS + ":/?#[]@")
)
@@ -327,9 +329,7 @@ def encode_host(host: str) -> str:
raise InvalidURL(f"Invalid IDNA hostname: {host!r}")
-def normalize_port(
- port: typing.Optional[typing.Union[str, int]], scheme: str
-) -> typing.Optional[int]:
+def normalize_port(port: str | int | None, scheme: str) -> int | None:
# From https://tools.ietf.org/html/rfc3986#section-3.2.3
#
# "A scheme may define a default port. For example, the "http" scheme
@@ -393,7 +393,7 @@ def normalize_path(path: str) -> str:
"""
# https://datatracker.ietf.org/doc/html/rfc3986#section-5.2.4
components = path.split("/")
- output: typing.List[str] = []
+ output: list[str] = []
for component in components:
if component == ".":
pass
@@ -479,7 +479,7 @@ def quote(string: str, safe: str = "/") -> str:
return "".join(parts)
-def urlencode(items: typing.List[typing.Tuple[str, str]]) -> str:
+def urlencode(items: list[tuple[str, str]]) -> str:
"""
We can use a much simpler version of the stdlib urlencode here because
we don't need to handle a bunch of different typing cases, such as bytes vs str.
diff --git a/contrib/python/httpx/httpx/_urls.py b/contrib/python/httpx/httpx/_urls.py
index 26202e95db..43dedd5644 100644
--- a/contrib/python/httpx/httpx/_urls.py
+++ b/contrib/python/httpx/httpx/_urls.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import typing
from urllib.parse import parse_qs, unquote
@@ -70,9 +72,7 @@ class URL:
themselves.
"""
- def __init__(
- self, url: typing.Union["URL", str] = "", **kwargs: typing.Any
- ) -> None:
+ def __init__(self, url: URL | str = "", **kwargs: typing.Any) -> None:
if kwargs:
allowed = {
"scheme": str,
@@ -213,7 +213,7 @@ class URL:
return self._uri_reference.host.encode("ascii")
@property
- def port(self) -> typing.Optional[int]:
+ def port(self) -> int | None:
"""
The URL port as an integer.
@@ -270,7 +270,7 @@ class URL:
return query.encode("ascii")
@property
- def params(self) -> "QueryParams":
+ def params(self) -> QueryParams:
"""
The URL query parameters, neatly parsed and packaged into an immutable
multidict representation.
@@ -338,7 +338,7 @@ class URL:
"""
return not self.is_absolute_url
- def copy_with(self, **kwargs: typing.Any) -> "URL":
+ def copy_with(self, **kwargs: typing.Any) -> URL:
"""
Copy this URL, returning a new URL with some components altered.
Accepts the same set of parameters as the components that are made
@@ -353,19 +353,19 @@ class URL:
"""
return URL(self, **kwargs)
- def copy_set_param(self, key: str, value: typing.Any = None) -> "URL":
+ def copy_set_param(self, key: str, value: typing.Any = None) -> URL:
return self.copy_with(params=self.params.set(key, value))
- def copy_add_param(self, key: str, value: typing.Any = None) -> "URL":
+ def copy_add_param(self, key: str, value: typing.Any = None) -> URL:
return self.copy_with(params=self.params.add(key, value))
- def copy_remove_param(self, key: str) -> "URL":
+ def copy_remove_param(self, key: str) -> URL:
return self.copy_with(params=self.params.remove(key))
- def copy_merge_params(self, params: QueryParamTypes) -> "URL":
+ def copy_merge_params(self, params: QueryParamTypes) -> URL:
return self.copy_with(params=self.params.merge(params))
- def join(self, url: URLTypes) -> "URL":
+ def join(self, url: URLTypes) -> URL:
"""
Return an absolute URL, using this URL as the base.
@@ -420,9 +420,7 @@ class QueryParams(typing.Mapping[str, str]):
URL query parameters, as a multi-dict.
"""
- def __init__(
- self, *args: typing.Optional[QueryParamTypes], **kwargs: typing.Any
- ) -> None:
+ def __init__(self, *args: QueryParamTypes | None, **kwargs: typing.Any) -> None:
assert len(args) < 2, "Too many arguments."
assert not (args and kwargs), "Cannot mix named and unnamed arguments."
@@ -434,7 +432,7 @@ class QueryParams(typing.Mapping[str, str]):
elif isinstance(value, QueryParams):
self._dict = {k: list(v) for k, v in value._dict.items()}
else:
- dict_value: typing.Dict[typing.Any, typing.List[typing.Any]] = {}
+ dict_value: dict[typing.Any, list[typing.Any]] = {}
if isinstance(value, (list, tuple)):
# Convert list inputs like:
# [("a", "123"), ("a", "456"), ("b", "789")]
@@ -495,7 +493,7 @@ class QueryParams(typing.Mapping[str, str]):
"""
return {k: v[0] for k, v in self._dict.items()}.items()
- def multi_items(self) -> typing.List[typing.Tuple[str, str]]:
+ def multi_items(self) -> list[tuple[str, str]]:
"""
Return all items in the query params. Allow duplicate keys to occur.
@@ -504,7 +502,7 @@ class QueryParams(typing.Mapping[str, str]):
q = httpx.QueryParams("a=123&a=456&b=789")
assert list(q.multi_items()) == [("a", "123"), ("a", "456"), ("b", "789")]
"""
- multi_items: typing.List[typing.Tuple[str, str]] = []
+ multi_items: list[tuple[str, str]] = []
for k, v in self._dict.items():
multi_items.extend([(k, i) for i in v])
return multi_items
@@ -523,7 +521,7 @@ class QueryParams(typing.Mapping[str, str]):
return self._dict[str(key)][0]
return default
- def get_list(self, key: str) -> typing.List[str]:
+ def get_list(self, key: str) -> list[str]:
"""
Get all values from the query param for a given key.
@@ -534,7 +532,7 @@ class QueryParams(typing.Mapping[str, str]):
"""
return list(self._dict.get(str(key), []))
- def set(self, key: str, value: typing.Any = None) -> "QueryParams":
+ def set(self, key: str, value: typing.Any = None) -> QueryParams:
"""
Return a new QueryParams instance, setting the value of a key.
@@ -549,7 +547,7 @@ class QueryParams(typing.Mapping[str, str]):
q._dict[str(key)] = [primitive_value_to_str(value)]
return q
- def add(self, key: str, value: typing.Any = None) -> "QueryParams":
+ def add(self, key: str, value: typing.Any = None) -> QueryParams:
"""
Return a new QueryParams instance, setting or appending the value of a key.
@@ -564,7 +562,7 @@ class QueryParams(typing.Mapping[str, str]):
q._dict[str(key)] = q.get_list(key) + [primitive_value_to_str(value)]
return q
- def remove(self, key: str) -> "QueryParams":
+ def remove(self, key: str) -> QueryParams:
"""
Return a new QueryParams instance, removing the value of a key.
@@ -579,7 +577,7 @@ class QueryParams(typing.Mapping[str, str]):
q._dict.pop(str(key), None)
return q
- def merge(self, params: typing.Optional[QueryParamTypes] = None) -> "QueryParams":
+ def merge(self, params: QueryParamTypes | None = None) -> QueryParams:
"""
Return a new QueryParams instance, updated with.
@@ -635,7 +633,7 @@ class QueryParams(typing.Mapping[str, str]):
query_string = str(self)
return f"{class_name}({query_string!r})"
- def update(self, params: typing.Optional[QueryParamTypes] = None) -> None:
+ def update(self, params: QueryParamTypes | None = None) -> None:
raise RuntimeError(
"QueryParams are immutable since 0.18.0. "
"Use `q = q.merge(...)` to create an updated copy."
diff --git a/contrib/python/httpx/httpx/_utils.py b/contrib/python/httpx/httpx/_utils.py
index bc3cb001dd..a9ece19438 100644
--- a/contrib/python/httpx/httpx/_utils.py
+++ b/contrib/python/httpx/httpx/_utils.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import codecs
import email.message
import ipaddress
@@ -27,9 +29,9 @@ _HTML5_FORM_ENCODING_RE = re.compile(
def normalize_header_key(
- value: typing.Union[str, bytes],
+ value: str | bytes,
lower: bool,
- encoding: typing.Optional[str] = None,
+ encoding: str | None = None,
) -> bytes:
"""
Coerce str/bytes into a strictly byte-wise HTTP header key.
@@ -42,9 +44,7 @@ def normalize_header_key(
return bytes_value.lower() if lower else bytes_value
-def normalize_header_value(
- value: typing.Union[str, bytes], encoding: typing.Optional[str] = None
-) -> bytes:
+def normalize_header_value(value: str | bytes, encoding: str | None = None) -> bytes:
"""
Coerce str/bytes into a strictly byte-wise HTTP header value.
"""
@@ -53,7 +53,7 @@ def normalize_header_value(
return value.encode(encoding or "ascii")
-def primitive_value_to_str(value: "PrimitiveData") -> str:
+def primitive_value_to_str(value: PrimitiveData) -> str:
"""
Coerce a primitive data type into a string value.
@@ -91,7 +91,7 @@ def format_form_param(name: str, value: str) -> bytes:
return f'{name}="{value}"'.encode()
-def get_ca_bundle_from_env() -> typing.Optional[str]:
+def get_ca_bundle_from_env() -> str | None:
if "SSL_CERT_FILE" in os.environ:
ssl_file = Path(os.environ["SSL_CERT_FILE"])
if ssl_file.is_file():
@@ -103,7 +103,7 @@ def get_ca_bundle_from_env() -> typing.Optional[str]:
return None
-def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]:
+def parse_header_links(value: str) -> list[dict[str, str]]:
"""
Returns a list of parsed link headers, for more info see:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
@@ -119,7 +119,7 @@ def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]:
:param value: HTTP Link entity-header field
:return: list of parsed link headers
"""
- links: typing.List[typing.Dict[str, str]] = []
+ links: list[dict[str, str]] = []
replace_chars = " '\""
value = value.strip(replace_chars)
if not value:
@@ -140,7 +140,7 @@ def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]:
return links
-def parse_content_type_charset(content_type: str) -> typing.Optional[str]:
+def parse_content_type_charset(content_type: str) -> str | None:
# We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
# See: https://peps.python.org/pep-0594/#cgi
msg = email.message.Message()
@@ -152,21 +152,21 @@ SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}
def obfuscate_sensitive_headers(
- items: typing.Iterable[typing.Tuple[typing.AnyStr, typing.AnyStr]],
-) -> typing.Iterator[typing.Tuple[typing.AnyStr, typing.AnyStr]]:
+ items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]],
+) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]:
for k, v in items:
if to_str(k.lower()) in SENSITIVE_HEADERS:
v = to_bytes_or_str("[secure]", match_type_of=v)
yield k, v
-def port_or_default(url: "URL") -> typing.Optional[int]:
+def port_or_default(url: URL) -> int | None:
if url.port is not None:
return url.port
return {"http": 80, "https": 443}.get(url.scheme)
-def same_origin(url: "URL", other: "URL") -> bool:
+def same_origin(url: URL, other: URL) -> bool:
"""
Return 'True' if the given URLs share the same origin.
"""
@@ -177,7 +177,7 @@ def same_origin(url: "URL", other: "URL") -> bool:
)
-def is_https_redirect(url: "URL", location: "URL") -> bool:
+def is_https_redirect(url: URL, location: URL) -> bool:
"""
Return 'True' if 'location' is a HTTPS upgrade of 'url'
"""
@@ -192,7 +192,7 @@ def is_https_redirect(url: "URL", location: "URL") -> bool:
)
-def get_environment_proxies() -> typing.Dict[str, typing.Optional[str]]:
+def get_environment_proxies() -> dict[str, str | None]:
"""Gets proxy information from the environment"""
# urllib.request.getproxies() falls back on System
@@ -200,7 +200,7 @@ def get_environment_proxies() -> typing.Dict[str, typing.Optional[str]]:
# We don't want to propagate non-HTTP proxies into
# our configuration such as 'TRAVIS_APT_PROXY'.
proxy_info = getproxies()
- mounts: typing.Dict[str, typing.Optional[str]] = {}
+ mounts: dict[str, str | None] = {}
for scheme in ("http", "https", "all"):
if proxy_info.get(scheme):
@@ -241,11 +241,11 @@ def get_environment_proxies() -> typing.Dict[str, typing.Optional[str]]:
return mounts
-def to_bytes(value: typing.Union[str, bytes], encoding: str = "utf-8") -> bytes:
+def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes:
return value.encode(encoding) if isinstance(value, str) else value
-def to_str(value: typing.Union[str, bytes], encoding: str = "utf-8") -> str:
+def to_str(value: str | bytes, encoding: str = "utf-8") -> str:
return value if isinstance(value, str) else value.decode(encoding)
@@ -257,13 +257,13 @@ def unquote(value: str) -> str:
return value[1:-1] if value[0] == value[-1] == '"' else value
-def guess_content_type(filename: typing.Optional[str]) -> typing.Optional[str]:
+def guess_content_type(filename: str | None) -> str | None:
if filename:
return mimetypes.guess_type(filename)[0] or "application/octet-stream"
return None
-def peek_filelike_length(stream: typing.Any) -> typing.Optional[int]:
+def peek_filelike_length(stream: typing.Any) -> int | None:
"""
Given a file-like stream object, return its length in number of bytes
without reading it into memory.
@@ -373,7 +373,7 @@ class URLPattern:
self.host = "" if url.host == "*" else url.host
self.port = url.port
if not url.host or url.host == "*":
- self.host_regex: typing.Optional[typing.Pattern[str]] = None
+ self.host_regex: typing.Pattern[str] | None = None
elif url.host.startswith("*."):
# *.example.com should match "www.example.com", but not "example.com"
domain = re.escape(url.host[2:])
@@ -387,7 +387,7 @@ class URLPattern:
domain = re.escape(url.host)
self.host_regex = re.compile(f"^{domain}$")
- def matches(self, other: "URL") -> bool:
+ def matches(self, other: URL) -> bool:
if self.scheme and self.scheme != other.scheme:
return False
if (
@@ -401,7 +401,7 @@ class URLPattern:
return True
@property
- def priority(self) -> typing.Tuple[int, int, int]:
+ def priority(self) -> tuple[int, int, int]:
"""
The priority allows URLPattern instances to be sortable, so that
we can match from most specific to least specific.
@@ -417,7 +417,7 @@ class URLPattern:
def __hash__(self) -> int:
return hash(self.pattern)
- def __lt__(self, other: "URLPattern") -> bool:
+ def __lt__(self, other: URLPattern) -> bool:
return self.priority < other.priority
def __eq__(self, other: typing.Any) -> bool:
diff --git a/contrib/python/httpx/ya.make b/contrib/python/httpx/ya.make
index 2f8eaa9087..fe32d75035 100644
--- a/contrib/python/httpx/ya.make
+++ b/contrib/python/httpx/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(0.26.0)
+VERSION(0.27.0)
LICENSE(BSD-3-Clause)
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA
index 675a1d8825..71ddeb7b6f 100644
--- a/contrib/python/ydb/py3/.dist-info/METADATA
+++ b/contrib/python/ydb/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
-Version: 3.8.0
+Version: 3.8.1
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make
index 484d68aafd..24ba22bc0f 100644
--- a/contrib/python/ydb/py3/ya.make
+++ b/contrib/python/ydb/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.8.0)
+VERSION(3.8.1)
LICENSE(Apache-2.0)
diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py
index 803309ff6d..98bcc5d2ec 100644
--- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py
+++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py
@@ -202,7 +202,7 @@ class GrpcWrapperAsyncIO(IGrpcWrapperAsyncIO):
# todo handle grpc exceptions and convert it to internal exceptions
try:
grpc_message = await self.from_server_grpc.__anext__()
- except grpc.RpcError as e:
+ except (grpc.RpcError, grpc.aio.AioRpcError) as e:
raise connection._rpc_error_handler(self._connection_state, e)
issues._process_response(grpc_message)
diff --git a/contrib/python/ydb/py3/ydb/connection.py b/contrib/python/ydb/py3/ydb/connection.py
index 1c4bd9c78d..8e65cd3b83 100644
--- a/contrib/python/ydb/py3/ydb/connection.py
+++ b/contrib/python/ydb/py3/ydb/connection.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import logging
import copy
+import typing
from concurrent import futures
import uuid
import threading
@@ -61,7 +62,11 @@ def _log_request(rpc_state, request):
logger.debug("%s: request = { %s }", rpc_state, _message_to_string(request))
-def _rpc_error_handler(rpc_state, rpc_error, on_disconnected=None):
+def _rpc_error_handler(
+ rpc_state,
+ rpc_error: typing.Union[grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call],
+ on_disconnected: typing.Callable[[], None] = None,
+):
"""
RPC call error handler, that translates gRPC error into YDB issue
:param rpc_state: A state of rpc
@@ -69,7 +74,7 @@ def _rpc_error_handler(rpc_state, rpc_error, on_disconnected=None):
:param on_disconnected: a handler to call on disconnected connection
"""
logger.info("%s: received error, %s", rpc_state, rpc_error)
- if isinstance(rpc_error, grpc.Call):
+ if isinstance(rpc_error, (grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call)):
if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
return issues.Unauthenticated(rpc_error.details())
elif rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py
index 8173e1e609..a464e6dc07 100644
--- a/contrib/python/ydb/py3/ydb/ydb_version.py
+++ b/contrib/python/ydb/py3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.8.0"
+VERSION = "3.8.1"
diff --git a/yt/yt/client/table_client/config.cpp b/yt/yt/client/table_client/config.cpp
index d8629254d5..863672915b 100644
--- a/yt/yt/client/table_client/config.cpp
+++ b/yt/yt/client/table_client/config.cpp
@@ -285,7 +285,7 @@ void TDictionaryCompressionConfig::Register(TRegistrar registrar)
.Default(12_MB);
registrar.Parameter("max_acceptable_compression_ratio", &TThis::MaxAcceptableCompressionRatio)
.Default(0.7)
- .InRange(0, 1);
+ .GreaterThanOrEqual(0.);
registrar.Parameter("max_decompression_blob_size", &TThis::MaxDecompressionBlobSize)
.GreaterThan(0)
.Default(64_MB);
diff --git a/yt/yt/core/actions/invoker_pool.h b/yt/yt/core/actions/invoker_pool.h
index c7c6afe0a8..a467e5ce2c 100644
--- a/yt/yt/core/actions/invoker_pool.h
+++ b/yt/yt/core/actions/invoker_pool.h
@@ -61,6 +61,7 @@ public:
{
i64 EnqueuedActionCount = 0;
i64 DequeuedActionCount = 0;
+ i64 ExecutedActionCount = 0;
i64 WaitingActionCount = 0;
TDuration TotalTimeEstimate;
};
diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool-inl.h b/yt/yt/core/concurrency/fair_share_invoker_pool-inl.h
new file mode 100644
index 0000000000..69be67ae68
--- /dev/null
+++ b/yt/yt/core/concurrency/fair_share_invoker_pool-inl.h
@@ -0,0 +1,40 @@
+#ifndef FAIR_SHARE_INVOKER_POOL_INL_H_
+#error "Direct inclusion of this file is not allowed, include fair_share_invoker_pool.h"
+// For the sake of sane code completion.
+#include "fair_share_invoker_pool.h"
+#endif
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class EInvoker>
+ requires TEnumTraits<EInvoker>::IsEnum
+TDiagnosableInvokerPoolPtr CreateEnumIndexedProfiledFairShareInvokerPool(
+ IInvokerPtr underlyingInvoker,
+ TFairShareCallbackQueueFactory callbackQueueFactory,
+ TDuration actionTimeRelevancyHalflife,
+ const TString& poolName,
+ NProfiling::IRegistryImplPtr registry)
+{
+ using TTraits = TEnumTraits<EInvoker>;
+
+ std::vector<TString> bucketNames;
+ bucketNames.reserve(TTraits::GetDomainSize());
+
+ for (const auto& enumName : TTraits::GetDomainNames()) {
+ bucketNames.emplace_back(enumName);
+ }
+
+ return CreateProfiledFairShareInvokerPool(
+ std::move(underlyingInvoker),
+ std::move(callbackQueueFactory),
+ actionTimeRelevancyHalflife,
+ poolName,
+ bucketNames,
+ std::move(registry));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
index 99f231ebbd..7841446b7e 100644
--- a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
+++ b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
@@ -1,4 +1,5 @@
#include "fair_share_invoker_pool.h"
+#include "profiling_helpers.h"
#include <yt/yt/core/actions/current_invoker.h>
#include <yt/yt/core/actions/invoker_detail.h>
@@ -6,8 +7,13 @@
#include <yt/yt/core/misc/finally.h>
#include <yt/yt/core/misc/ring_queue.h>
+#include <yt/yt/core/profiling/public.h>
#include <yt/yt/core/profiling/timing.h>
+#include <yt/yt/library/profiling/sensor.h>
+
+#include <yt/yt/library/ytprof/api/api.h>
+
#include <library/cpp/yt/memory/weak_ptr.h>
#include <library/cpp/yt/threading/rw_spin_lock.h>
@@ -19,6 +25,19 @@
namespace NYT::NConcurrency {
using namespace NProfiling;
+using namespace NYTProf;
+
+////////////////////////////////////////////////////////////////////////////////
+
+constinit YT_THREAD_LOCAL(TCpuProfilerTagGuard) FairShareInvokerPoolProfilerTagGuard;
+
+////////////////////////////////////////////////////////////////////////////////
+
+#if defined(_unix_)
+ #define NO_UNIQUE_ADDRESS [[no_unique_address]]
+#else
+ #define NO_UNIQUE_ADDRESS
+#endif
////////////////////////////////////////////////////////////////////////////////
@@ -120,23 +139,207 @@ IFairShareCallbackQueuePtr CreateFairShareCallbackQueue(int bucketCount)
////////////////////////////////////////////////////////////////////////////////
+template <bool EnableProfiling>
+class TFairShareInvokerPoolProfiler
+{
+public:
+ using TObject = TFairShareInvokerPoolProfiler;
+
+ class THandle
+ {
+ public:
+ void ProfileEnqueue()
+ { }
+
+ void ProfileDequeue(TDuration /*waitTime*/)
+ { }
+
+ void ProfileExecutionFinish(TDuration /*execTime*/, TDuration /*totalTime*/)
+ { }
+ };
+
+ static TObject Create(
+ const TString& /*poolName*/,
+ std::vector<TString> /*bucketNames*/,
+ IRegistryImplPtr /*registry*/)
+ {
+ return {};
+ }
+
+ static THandle MakeHandle(const TObject& /*self*/, int /*handleIndex*/)
+ {
+ return THandle();
+ }
+};
+
+template <>
+class TFairShareInvokerPoolProfiler<true>
+ : public TRefCounted
+{
+public:
+ using TObject = TIntrusivePtr<TFairShareInvokerPoolProfiler>;
+
+ class THandle
+ {
+ public:
+ void ProfileEnqueue()
+ {
+ Profiler_->ProfileEnqueue(HandleIndex_);
+ }
+
+ void ProfileDequeue(TDuration waitTime)
+ {
+ Profiler_->ProfileDequeue(HandleIndex_, waitTime);
+ }
+
+ void ProfileExecutionFinish(TDuration execTime, TDuration totalTime)
+ {
+ Profiler_->ProfileExecutionFinish(HandleIndex_, execTime, totalTime);
+ }
+
+ private:
+ friend class TFairShareInvokerPoolProfiler;
+
+ TObject Profiler_;
+ int HandleIndex_;
+
+ THandle(TObject profiler, int handleIndex)
+ : Profiler_(std::move(profiler))
+ , HandleIndex_(handleIndex)
+ { }
+ };
+
+ static TObject Create(
+ const TString& poolName,
+ std::vector<TString> bucketNames,
+ IRegistryImplPtr registry)
+ {
+ return New<TFairShareInvokerPoolProfiler>(poolName, std::move(bucketNames), std::move(registry));
+ }
+
+ static THandle MakeHandle(const TObject& self, int handleIndex)
+ {
+ return THandle(self, handleIndex);
+ }
+
+private:
+ friend class THandle;
+
+ DECLARE_NEW_FRIEND();
+
+ std::vector<TProfilerTagPtr> BucketProfilerTags_;
+
+ struct TCounters
+ {
+ NProfiling::TCounter EnqueuedCounter;
+ NProfiling::TCounter DequeuedCounter;
+ NProfiling::TEventTimer WaitTimer;
+ NProfiling::TEventTimer ExecTimer;
+ NProfiling::TTimeCounter CumulativeTimeCounter;
+ NProfiling::TEventTimer TotalTimer;
+ std::atomic<int> ActiveCallbacks = 0;
+ };
+
+ using TCountersPtr = std::unique_ptr<TCounters>;
+
+ std::vector<TCountersPtr> Counters_;
+
+ TFairShareInvokerPoolProfiler(
+ const TString& poolName,
+ std::vector<TString> bucketNames,
+ IRegistryImplPtr registry)
+ {
+ Counters_.reserve(std::ssize(bucketNames));
+ BucketProfilerTags_.reserve(std::ssize(bucketNames));
+
+ for (const auto& bucketName : bucketNames) {
+ Counters_.push_back(CreateCounters(GetBucketTags(poolName, bucketName), registry));
+ BucketProfilerTags_.push_back(New<TProfilerTag>("bucket", bucketName));
+ }
+ }
+
+ TCountersPtr CreateCounters(const TTagSet& tagSet, const IRegistryImplPtr& registry) {
+ auto profiler = TProfiler(registry, "/fair_share_invoker_pool").WithTags(tagSet).WithHot();
+
+ auto counters = std::make_unique<TCounters>();
+ counters->EnqueuedCounter = profiler.Counter("/enqueued");
+ counters->DequeuedCounter = profiler.Counter("/dequeued");
+ counters->WaitTimer = profiler.Timer("/time/wait");
+ counters->ExecTimer = profiler.Timer("/time/exec");
+ counters->CumulativeTimeCounter = profiler.TimeCounter("/time/cumulative");
+ counters->TotalTimer = profiler.Timer("/time/total");
+
+ profiler.AddFuncGauge(
+ "/size",
+ MakeStrong(this),
+ [counters = counters.get()] {
+ return counters->ActiveCallbacks.load(std::memory_order::relaxed);
+ });
+
+ return counters;
+ }
+
+ void ProfileEnqueue(int index)
+ {
+ auto& counters = Counters_[index];
+ if (counters) {
+ counters->ActiveCallbacks.fetch_add(1, std::memory_order::relaxed);
+ counters->EnqueuedCounter.Increment();
+ }
+ }
+
+ void ProfileDequeue(int index, TDuration waitTime)
+ {
+ auto& counters = Counters_[index];
+ if (counters) {
+ counters->DequeuedCounter.Increment();
+ counters->WaitTimer.Record(waitTime);
+ }
+
+ GetTlsRef(FairShareInvokerPoolProfilerTagGuard) = TCpuProfilerTagGuard(BucketProfilerTags_[index]);
+ }
+
+ void ProfileExecutionFinish(int index, TDuration execTime, TDuration totalTime)
+ {
+ GetTlsRef(FairShareInvokerPoolProfilerTagGuard) = TCpuProfilerTagGuard{};
+
+ auto& counters = Counters_[index];
+ if (counters) {
+ counters->ExecTimer.Record(execTime);
+ counters->CumulativeTimeCounter.Add(execTime);
+ counters->TotalTimer.Record(totalTime);
+ counters->ActiveCallbacks.fetch_sub(1, std::memory_order::relaxed);
+ }
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <bool EnableProfiling>
class TFairShareInvokerPool
: public TDiagnosableInvokerPool
{
+ using TPoolProfiler = TFairShareInvokerPoolProfiler<EnableProfiling>;
+ using TPoolProfilerObject = typename TPoolProfiler::TObject;
+
public:
TFairShareInvokerPool(
IInvokerPtr underlyingInvoker,
int invokerCount,
TFairShareCallbackQueueFactory callbackQueueFactory,
- TDuration actionTimeRelevancyHalflife)
+ TDuration actionTimeRelevancyHalflife,
+ const TString& poolName = "",
+ std::vector<TString> bucketNames = {},
+ IRegistryImplPtr registry = nullptr)
: UnderlyingInvoker_(std::move(underlyingInvoker))
, Queue_(callbackQueueFactory(invokerCount))
+ , Profiler_(TPoolProfiler::Create(poolName, std::move(bucketNames), std::move(registry)))
{
Invokers_.reserve(invokerCount);
InvokerQueueStates_.reserve(invokerCount);
for (int index = 0; index < invokerCount; ++index) {
Invokers_.push_back(New<TInvoker>(UnderlyingInvoker_, index, MakeWeak(this)));
- InvokerQueueStates_.emplace_back(actionTimeRelevancyHalflife);
+ InvokerQueueStates_.emplace_back(actionTimeRelevancyHalflife, TPoolProfiler::MakeHandle(Profiler_, index));
}
}
@@ -197,16 +400,33 @@ private:
class TInvokerQueueState
{
+ using THandle = typename TPoolProfiler::THandle;
+
public:
explicit TInvokerQueueState(
- TDuration halflife)
+ TDuration halflife,
+ THandle profilerHandle)
: AverageTimeAggregator_(halflife)
+ , ProfilerHandle_(std::move(profilerHandle))
{ }
void OnActionEnqueued(TInstant now)
{
ActionEnqueueTimes_.push(now);
++EnqueuedActionCount_;
+ ProfilerHandle_.ProfileEnqueue();
+ }
+
+ //! We do not remove any enqueue times now because we
+ //! enjoy invariant ActionEnqueueTimes_.empty() iff
+ //! no action running.
+ void OnActionDequeued(TInstant now)
+ {
+ YT_VERIFY(!ActionEnqueueTimes_.empty());
+ ++DequeuedActionCount_;
+
+ auto waitTime = now - ActionEnqueueTimes_.front();
+ ProfilerHandle_.ProfileDequeue(waitTime);
}
//! NB: We report action after execution and not after dequeue because
@@ -216,15 +436,17 @@ private:
//! the next time we try to check if we should enqueue action.
//! This will result in more actions stuck in queue than needed
//! to determine whether or not invoker is frozen.
- void OnActionExecuted(TInstant now)
+ void OnActionExecuted(TInstant now, TInstant dequeueTime)
{
YT_VERIFY(!ActionEnqueueTimes_.empty());
- ++DequeuedActionCount_;
+ ++ExecutedActionCount_;
- auto totalWaitTime = now - ActionEnqueueTimes_.front();
+ auto execTime = now - dequeueTime;
+ auto totalTime = now - ActionEnqueueTimes_.front();
ActionEnqueueTimes_.pop();
- AverageTimeAggregator_.UpdateAt(now, totalWaitTime.MillisecondsFloat());
+ AverageTimeAggregator_.UpdateAt(now, totalTime.MillisecondsFloat());
+ ProfilerHandle_.ProfileExecutionFinish(execTime, totalTime);
}
TInvokerStatistics GetInvokerStatistics(TInstant now) const
@@ -234,6 +456,7 @@ private:
return TInvokerStatistics{
.EnqueuedActionCount = EnqueuedActionCount_,
.DequeuedActionCount = DequeuedActionCount_,
+ .ExecutedActionCount = ExecutedActionCount_,
.WaitingActionCount = waitingActionCount,
.TotalTimeEstimate = GetTotalTimeEstimate(now),
};
@@ -250,6 +473,9 @@ private:
i64 EnqueuedActionCount_ = 0;
i64 DequeuedActionCount_ = 0;
+ i64 ExecutedActionCount_ = 0;
+
+ NO_UNIQUE_ADDRESS THandle ProfilerHandle_;
TDuration GetTotalTimeEstimate(TInstant now) const
{
@@ -276,6 +502,8 @@ private:
IFairShareCallbackQueuePtr Queue_;
+ NO_UNIQUE_ADDRESS TPoolProfilerObject Profiler_;
+
class TCpuTimeAccounter
{
public:
@@ -344,6 +572,14 @@ private:
YT_VERIFY(Queue_->TryDequeue(&callback, &bucketIndex));
YT_VERIFY(IsValidInvokerIndex(bucketIndex));
+ TInstant dequeueTime = GetInstant();
+
+ if constexpr (EnableProfiling) {
+ auto guard = Guard(InvokerQueueStatesLock_);
+ auto& queueState = InvokerQueueStates_[bucketIndex];
+ queueState.OnActionDequeued(dequeueTime);
+ }
+
//! NB1: Finally causes us to count total time (wait + execution) in our statistics.
//! This is done to compensate for the following situation:
//! Consider the first task in the batch
@@ -365,7 +601,7 @@ private:
auto guard = Guard(InvokerQueueStatesLock_);
auto& queueState = InvokerQueueStates_[bucketIndex];
- queueState.OnActionExecuted(now);
+ queueState.OnActionExecuted(now, dequeueTime);
});
{
@@ -385,7 +621,7 @@ TDiagnosableInvokerPoolPtr CreateFairShareInvokerPool(
TDuration actionTimeRelevancyHalflife)
{
YT_VERIFY(0 < invokerCount && invokerCount < 100);
- return New<TFairShareInvokerPool>(
+ return New<TFairShareInvokerPool</*EnableProfiling*/ false>>(
std::move(underlyingInvoker),
invokerCount,
std::move(callbackQueueFactory),
@@ -394,4 +630,26 @@ TDiagnosableInvokerPoolPtr CreateFairShareInvokerPool(
////////////////////////////////////////////////////////////////////////////////
+TDiagnosableInvokerPoolPtr CreateProfiledFairShareInvokerPool(
+ IInvokerPtr underlyingInvoker,
+ TFairShareCallbackQueueFactory callbackQueueFactory,
+ TDuration actionTimeRelevancyHalflife,
+ const TString& poolName,
+ std::vector<TString> bucketNames,
+ IRegistryImplPtr registry)
+{
+ YT_VERIFY(0 < std::ssize(bucketNames) && std::ssize(bucketNames) < 100);
+
+ return New<TFairShareInvokerPool</*EnableProfiling*/ true>>(
+ std::move(underlyingInvoker),
+ std::ssize(bucketNames),
+ std::move(callbackQueueFactory),
+ actionTimeRelevancyHalflife,
+ poolName,
+ std::move(bucketNames),
+ std::move(registry));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.h b/yt/yt/core/concurrency/fair_share_invoker_pool.h
index 95c3c07b97..ad3d27783f 100644
--- a/yt/yt/core/concurrency/fair_share_invoker_pool.h
+++ b/yt/yt/core/concurrency/fair_share_invoker_pool.h
@@ -57,4 +57,34 @@ TDiagnosableInvokerPoolPtr CreateFairShareInvokerPool(
////////////////////////////////////////////////////////////////////////////////
+//! Creates invoker pool from above with invokerCount = bucketNames.size()
+//! And adds profiling on top of it.
+
+TDiagnosableInvokerPoolPtr CreateProfiledFairShareInvokerPool(
+ IInvokerPtr underlyingInvoker,
+ TFairShareCallbackQueueFactory callbackQueueFactory = CreateFairShareCallbackQueue,
+ TDuration actionTimeRelevancyHalflife = TAdjustedExponentialMovingAverage::DefaultHalflife,
+ const TString& poolName = "fair_share_invoker_pool",
+ std::vector<TString> bucketNames = {},
+ NProfiling::IRegistryImplPtr registry = nullptr);
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Same as above but bucket names are derived from EInvoker domain values.
+
+template <class EInvoker>
+ requires TEnumTraits<EInvoker>::IsEnum
+TDiagnosableInvokerPoolPtr CreateEnumIndexedProfiledFairShareInvokerPool(
+ IInvokerPtr underlyingInvoker,
+ TFairShareCallbackQueueFactory callbackQueueFactory = CreateFairShareCallbackQueue,
+ TDuration actionTimeRelevancyHalflife = TAdjustedExponentialMovingAverage::DefaultHalflife,
+ const TString& poolName = "fair_share_invoker_pool",
+ NProfiling::IRegistryImplPtr registry = nullptr);
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NConcurrency
+
+#define FAIR_SHARE_INVOKER_POOL_INL_H_
+#include "fair_share_invoker_pool-inl.h"
+#undef FAIR_SHARE_INVOKER_POOL_INL_H_
diff --git a/yt/yt/core/concurrency/unittests/profiled_fair_share_invoker_pool_ut.cpp b/yt/yt/core/concurrency/unittests/profiled_fair_share_invoker_pool_ut.cpp
new file mode 100644
index 0000000000..89b84ad4ea
--- /dev/null
+++ b/yt/yt/core/concurrency/unittests/profiled_fair_share_invoker_pool_ut.cpp
@@ -0,0 +1,908 @@
+#include <yt/yt/core/test_framework/framework.h>
+
+#include <yt/yt/core/profiling/timing.h>
+
+#include <yt/yt/core/concurrency/action_queue.h>
+#include <yt/yt/core/concurrency/delayed_executor.h>
+#include <yt/yt/core/concurrency/fair_share_invoker_pool.h>
+#include <yt/yt/core/concurrency/scheduler.h>
+#include <yt/yt/core/concurrency/thread_pool.h>
+
+#include <yt/yt/core/misc/collection_helpers.h>
+
+#include <yt/yt/core/misc/lazy_ptr.h>
+
+#include <yt/yt/library/profiling/solomon/exporter.h>
+
+#include <util/datetime/base.h>
+
+#include <algorithm>
+#include <array>
+#include <ranges>
+#include <utility>
+
+namespace NYT::NConcurrency {
+namespace {
+
+using namespace NProfiling;
+
+////////////////////////////////////////////////////////////////////////////////
+
+constexpr auto Margin = TDuration::MilliSeconds(1);
+constexpr auto Quantum = TDuration::MilliSeconds(100);
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TMockFairShareCallbackQueue
+ : public IFairShareCallbackQueue
+{
+public:
+ explicit TMockFairShareCallbackQueue(int bucketCount)
+ : UnderlyingCallbackQueue_(CreateFairShareCallbackQueue(bucketCount))
+ , TotalCpuTime_(bucketCount)
+ { }
+
+ void Enqueue(TClosure callback, int bucketIndex) override
+ {
+ UnderlyingCallbackQueue_->Enqueue(std::move(callback), bucketIndex);
+ }
+
+ bool TryDequeue(TClosure* resultCallback, int* resultBucketIndex) override
+ {
+ return UnderlyingCallbackQueue_->TryDequeue(resultCallback, resultBucketIndex);
+ }
+
+ void AccountCpuTime(int bucketIndex, NProfiling::TCpuDuration cpuTime) override
+ {
+ YT_VERIFY(IsValidBucketIndex(bucketIndex));
+ TotalCpuTime_[bucketIndex] += cpuTime;
+ UnderlyingCallbackQueue_->AccountCpuTime(bucketIndex, cpuTime);
+ }
+
+ NProfiling::TCpuDuration GetTotalCpuTime(int bucketIndex) const
+ {
+ YT_VERIFY(IsValidBucketIndex(bucketIndex));
+ return TotalCpuTime_[bucketIndex];
+ }
+
+private:
+ const IFairShareCallbackQueuePtr UnderlyingCallbackQueue_;
+ std::vector<std::atomic<NProfiling::TCpuDuration>> TotalCpuTime_;
+
+ bool IsValidBucketIndex(int bucketIndex) const
+ {
+ return 0 <= bucketIndex && bucketIndex < std::ssize(TotalCpuTime_);
+ }
+};
+
+using TMockFairShareCallbackQueuePtr = TIntrusivePtr<TMockFairShareCallbackQueue>;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TProfiledFairShareInvokerPoolTest
+ : public ::testing::Test
+{
+protected:
+ std::array<TLazyIntrusivePtr<TActionQueue>, 2> Queues_;
+
+ THashMap<IInvoker*, int> InvokerToIndex_;
+
+ TMockFairShareCallbackQueuePtr MockCallbackQueue;
+
+ struct TInvocationOrder
+ {
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_);
+ std::vector<int> InvokerIndexes_;
+ } InvocationOrder_;
+
+ void TearDown() override
+ {
+ for (int i = 0; i < std::ssize(Queues_); ++i) {
+ if (Queues_[i]) {
+ Queues_[i]->Shutdown();
+ }
+ }
+ }
+
+ template <typename TInvokerPoolPtr>
+ void InitializeInvokerToIndexMapping(const TInvokerPoolPtr& invokerPool, int invokerCount)
+ {
+ YT_VERIFY(invokerCount > 0);
+ InvokerToIndex_.clear();
+ for (int i = 0; i < invokerCount; ++i) {
+ auto invoker = invokerPool->GetInvoker(i);
+ InvokerToIndex_[invoker.Get()] = i;
+ }
+ }
+
+ int GetInvokerIndex(IInvoker* invokerAddress) const
+ {
+ return GetOrCrash(InvokerToIndex_, invokerAddress);
+ }
+
+ int GetCurrentInvokerIndex() const
+ {
+ return GetInvokerIndex(GetCurrentInvoker());
+ }
+
+ void ClearInvocationOrder()
+ {
+ auto guard = Guard(InvocationOrder_.Lock_);
+ InvocationOrder_.InvokerIndexes_.clear();
+ }
+
+ void PushInvokerIndexToInvocationOrder()
+ {
+ auto currentInvokerIndex = GetCurrentInvokerIndex();
+ auto guard = Guard(InvocationOrder_.Lock_);
+ InvocationOrder_.InvokerIndexes_.push_back(currentInvokerIndex);
+ }
+
+ std::vector<int> GetInvocationOrder()
+ {
+ auto guard = Guard(InvocationOrder_.Lock_);
+ return InvocationOrder_.InvokerIndexes_;
+ }
+
+ TDiagnosableInvokerPoolPtr CreateInvokerPool(IInvokerPtr underlyingInvoker, int invokerCount, TSolomonRegistryPtr registry = nullptr)
+ {
+ std::vector<TString> bucketNames;
+
+ for (int i = 0; i < invokerCount; ++i) {
+ bucketNames.push_back("invoker_" + ToString(i));
+ }
+ auto result = CreateProfiledFairShareInvokerPool(
+ std::move(underlyingInvoker),
+ [this] (int bucketCount) {
+ YT_VERIFY(bucketCount > 0);
+ MockCallbackQueue = New<TMockFairShareCallbackQueue>(bucketCount);
+ return MockCallbackQueue;
+ },
+ TAdjustedExponentialMovingAverage::DefaultHalflife,
+ "TestInvokerPool",
+ bucketNames,
+ std::move(registry));
+ InitializeInvokerToIndexMapping(result, invokerCount);
+ return result;
+ }
+
+ void ExpectInvokerIndex(int invokerIndex)
+ {
+ EXPECT_EQ(invokerIndex, GetCurrentInvokerIndex());
+ }
+
+ void ExpectTotalCpuTime(int bucketIndex, TDuration expectedCpuTime, TDuration precision = Quantum / 2)
+ {
+ // Push dummy callback to the scheduler queue and synchronously wait for it
+ // to ensure that all possible CPU time accounters were destroyed during fiber stack unwinding.
+ for (int i = 0; i < std::ssize(Queues_); ++i) {
+ if (Queues_[i]) {
+ auto invoker = Queues_[i]->GetInvoker();
+ BIND([] { }).AsyncVia(invoker).Run().Get().ThrowOnError();
+ }
+ }
+
+ auto precisionValue = NProfiling::DurationToValue(precision);
+ auto expectedValue = NProfiling::DurationToValue(expectedCpuTime);
+ auto actualValue = NProfiling::CpuDurationToValue(MockCallbackQueue->GetTotalCpuTime(bucketIndex));
+ EXPECT_GT(precisionValue, std::abs(expectedValue - actualValue));
+ }
+
+ void DoTestFairness(IInvokerPoolPtr invokerPool, int invokerCount)
+ {
+ YT_VERIFY(1 < invokerCount && invokerCount < 5);
+
+ // Each invoker executes some number of callbacks of the same duration |Quantum * (2 ^ #invokerIndex)|.
+ // Individual duration of callback and number of callbacks chosen
+ // such that total duration is same for all invokers.
+ auto getWeight = [] (int invokerIndex) {
+ return (1 << invokerIndex);
+ };
+ auto getSpinDuration = [getWeight] (int invokerIndex) {
+ return Quantum * getWeight(invokerIndex);
+ };
+ auto getCallbackCount = [getWeight, invokerCount] (int invokerIndex) {
+ // Weights are supposed to be in the ascending order.
+ return 4 * getWeight(invokerCount - 1) / getWeight(invokerIndex);
+ };
+
+ std::vector<TFuture<void>> futures;
+ for (int i = 0; i < invokerCount; ++i) {
+ for (int j = 0, callbackCount = getCallbackCount(i); j < callbackCount; ++j) {
+ futures.push_back(
+ BIND([this, spinDuration = getSpinDuration(i)] {
+ PushInvokerIndexToInvocationOrder();
+ Spin(spinDuration);
+ }).AsyncVia(invokerPool->GetInvoker(i)).Run());
+ }
+ }
+
+ AllSucceeded(futures).Get().ThrowOnError();
+
+ auto invocationOrder = GetInvocationOrder();
+
+ // Test is considered successful if at any moment of the execution
+ // deviation of the weighted count of executed callbacks per invoker
+ // is not greater than the threshold (see in the code below).
+ std::vector<int> invocationCount(invokerCount);
+ for (auto invokerIndex : invocationOrder) {
+ YT_VERIFY(0 <= invokerIndex && invokerIndex < invokerCount);
+
+ ++invocationCount[invokerIndex];
+
+ auto getWeightedInvocationCount = [getWeight, &invocationCount] (int invokerIndex) {
+ return invocationCount[invokerIndex] * getWeight(invokerIndex);
+ };
+
+ auto minWeightedInvocationCount = getWeightedInvocationCount(0);
+ auto maxWeightedInvocationCount = minWeightedInvocationCount;
+ for (int i = 0; i < invokerCount; ++i) {
+ auto weightedInvocationCount = getWeightedInvocationCount(i);
+ minWeightedInvocationCount = std::min(minWeightedInvocationCount, weightedInvocationCount);
+ maxWeightedInvocationCount = std::max(maxWeightedInvocationCount, weightedInvocationCount);
+ }
+
+ // Compare threshold and deviation.
+ EXPECT_GE(getWeight(invokerCount - 1), maxWeightedInvocationCount - minWeightedInvocationCount);
+ }
+
+ for (int i = 0; i < invokerCount; ++i) {
+ EXPECT_EQ(getCallbackCount(i), invocationCount[i]);
+ }
+ }
+
+ void DoTestFairness(int invokerCount)
+ {
+ DoTestFairness(
+ CreateInvokerPool(Queues_[0]->GetInvoker(), invokerCount),
+ invokerCount);
+ }
+
+ void DoTestSwitchTo(int switchToCount)
+ {
+ YT_VERIFY(switchToCount > 0);
+
+ auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), switchToCount + 1);
+
+ auto callback = BIND([this, invokerPool, switchToCount] () {
+ for (int i = 1; i <= switchToCount; ++i) {
+ ExpectInvokerIndex(i - 1);
+ Spin(Quantum * i);
+ SwitchTo(invokerPool->GetInvoker(i));
+ }
+ ExpectInvokerIndex(switchToCount);
+ Spin(Quantum * (switchToCount + 1));
+ }).AsyncVia(invokerPool->GetInvoker(0));
+
+ callback.Run().Get().ThrowOnError();
+
+ for (int i = 0; i <= switchToCount; ++i) {
+ ExpectTotalCpuTime(i, Quantum * (i + 1));
+ }
+ }
+
+ void DoTestWaitFor(int waitForCount)
+ {
+ YT_VERIFY(waitForCount > 0);
+
+ auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 2);
+
+ auto callback = BIND([waitForCount] {
+ Spin(Quantum);
+ for (int i = 0; i < waitForCount; ++i) {
+ TDelayedExecutor::WaitForDuration(Quantum);
+ Spin(Quantum);
+ }
+ }).AsyncVia(invokerPool->GetInvoker(0));
+
+ callback.Run().Get().ThrowOnError();
+
+ ExpectTotalCpuTime(0, Quantum * (waitForCount + 1));
+ ExpectTotalCpuTime(1, TDuration::Zero());
+ }
+
+ static void Spin(TDuration duration)
+ {
+ NProfiling::TFiberWallTimer timer;
+ while (timer.GetElapsedTime() < duration) {
+ }
+ }
+};
+
+TEST_F(TProfiledFairShareInvokerPoolTest, Fairness2)
+{
+ DoTestFairness(2);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, Fairness3)
+{
+ DoTestFairness(3);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, Fairness4)
+{
+ DoTestFairness(4);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, SwitchTo12)
+{
+ DoTestSwitchTo(1);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, SwitchTo123)
+{
+ DoTestSwitchTo(2);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, SwitchTo1234)
+{
+ DoTestSwitchTo(3);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, SwitchTo121)
+{
+ auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 2);
+
+ auto callback = BIND([this, invokerPool] {
+ SwitchTo(invokerPool->GetInvoker(0));
+ ExpectInvokerIndex(0);
+ Spin(Quantum);
+
+ SwitchTo(invokerPool->GetInvoker(1));
+ ExpectInvokerIndex(1);
+ Spin(Quantum * 3);
+
+ SwitchTo(invokerPool->GetInvoker(0));
+ ExpectInvokerIndex(0);
+ Spin(Quantum);
+ }).AsyncVia(invokerPool->GetInvoker(0));
+
+ callback.Run().Get().ThrowOnError();
+
+ ExpectTotalCpuTime(0, Quantum * 2);
+ ExpectTotalCpuTime(1, Quantum * 3);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, SwitchTo111AndSwitchTo222)
+{
+ auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 2);
+
+ std::vector<TFuture<void>> futures;
+
+ futures.push_back(
+ BIND([this] {
+ ExpectInvokerIndex(0);
+ Spin(Quantum);
+ SwitchTo(GetCurrentInvoker());
+
+ ExpectInvokerIndex(0);
+ Spin(Quantum);
+ SwitchTo(GetCurrentInvoker());
+
+ ExpectInvokerIndex(0);
+ Spin(Quantum);
+ }).AsyncVia(invokerPool->GetInvoker(0)).Run());
+
+ futures.push_back(
+ BIND([this] {
+ ExpectInvokerIndex(1);
+ Spin(Quantum);
+ SwitchTo(GetCurrentInvoker());
+
+ ExpectInvokerIndex(1);
+ Spin(Quantum);
+ SwitchTo(GetCurrentInvoker());
+
+ ExpectInvokerIndex(1);
+ Spin(Quantum);
+ }).AsyncVia(invokerPool->GetInvoker(1)).Run());
+
+ AllSucceeded(futures).Get().ThrowOnError();
+
+ ExpectTotalCpuTime(0, Quantum * 3);
+ ExpectTotalCpuTime(1, Quantum * 3);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, WaitFor1)
+{
+ DoTestWaitFor(1);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, WaitFor2)
+{
+ DoTestWaitFor(2);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, WaitFor3)
+{
+ DoTestWaitFor(3);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, CpuTimeAccountingBetweenContextSwitchesIsNotSupportedYet)
+{
+ auto threadPool = CreateThreadPool(2, "ThreadPool");
+ auto invokerPool = CreateInvokerPool(threadPool->GetInvoker(), 2);
+
+ NThreading::TEvent started;
+
+ // Start busy loop in the first thread via first fair share invoker.
+ auto future = BIND([this, &started] {
+ Spin(Quantum * 10);
+
+ auto invocationOrder = GetInvocationOrder();
+ EXPECT_TRUE(invocationOrder.empty());
+
+ started.NotifyOne();
+
+ Spin(Quantum * 50);
+
+ invocationOrder = GetInvocationOrder();
+ EXPECT_TRUE(!invocationOrder.empty());
+ }).AsyncVia(invokerPool->GetInvoker(0)).Run();
+
+ YT_VERIFY(started.Wait(Quantum * 100));
+
+ // After 10 quantums of time (see notification of the #started variable) we start Fairness test in the second thread.
+ // In case of better implementation we expect to have non-fair CPU time distribution between first and second invokers,
+ // because first invoker is given more CPU time in the first thread (at least within margin of 10 quantums).
+ // But CPU accounting is not supported for running callbacks, therefore we expect Fairness test to pass.
+ DoTestFairness(invokerPool, 2);
+
+ future.Get().ThrowOnError();
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, GetTotalWaitTimeEstimateEmptyPool)
+{
+ auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 1);
+
+ EXPECT_EQ(TDuration::Zero(), invokerPool->GetInvokerStatistics(0).TotalTimeEstimate);
+
+ WaitFor(BIND([] {
+ Spin(Quantum);
+ }).AsyncVia(invokerPool->GetInvoker(0)).Run()).ThrowOnError();
+
+ EXPECT_LE(invokerPool->GetInvokerStatistics(0).TotalTimeEstimate, Quantum + Margin);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, GetTotalWaitTimeEstimateStuckAction)
+{
+ auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 1);
+ NThreading::TEvent event;
+
+ auto action = BIND([&event]{
+ event.Wait(TDuration::Seconds(100));
+ })
+ .AsyncVia(invokerPool->GetInvoker(0))
+ .Run();
+
+ TDelayedExecutor::WaitForDuration(Quantum);
+
+ auto totalTimeEstimate = invokerPool->GetInvokerStatistics(0).TotalTimeEstimate;
+
+ EXPECT_LE(totalTimeEstimate, Quantum + Margin);
+ EXPECT_GE(totalTimeEstimate, Quantum - Margin);
+
+ event.NotifyAll();
+ WaitFor(std::move(action)).ThrowOnError();
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, GetTotalWaitTimeEstimateRelevancyDecay)
+{
+ auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 1);
+ // Make aggregator very forgetful.
+ invokerPool->UpdateActionTimeRelevancyHalflife(TDuration::Zero());
+ NThreading::TEvent event;
+
+ auto action = BIND([&event]{
+ event.Wait(100 * Quantum);
+ })
+ .AsyncVia(invokerPool->GetInvoker(0))
+ .Run();
+
+ TDelayedExecutor::WaitForDuration(Quantum);
+
+ auto totalTimeEstimate = invokerPool->GetInvokerStatistics(0).TotalTimeEstimate;
+
+ EXPECT_LE(totalTimeEstimate, Quantum + Margin);
+ EXPECT_GE(totalTimeEstimate, Quantum - Margin);
+
+ event.NotifyAll();
+ WaitFor(std::move(action)).ThrowOnError();
+
+ TDelayedExecutor::WaitForDuration(Quantum);
+
+ EXPECT_LE(invokerPool->GetInvokerStatistics(0).TotalTimeEstimate, Margin);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, GetTotalWaitTimeEstimateSeveralActions)
+{
+ static constexpr int ActionCount = 3;
+
+ auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 1);
+ // Make aggregator never forget a sample.
+ invokerPool->UpdateActionTimeRelevancyHalflife(TDuration::Days(100000000000000000));
+
+ std::vector<NThreading::TEvent> leashes(ActionCount);
+ std::vector<TFuture<void>> actions;
+
+ for (int idx = 0; idx < ActionCount; ++idx) {
+ actions.emplace_back(BIND([&leashes, idx] {
+ leashes[idx].Wait(100 * Quantum);
+ })
+ .AsyncVia(invokerPool->GetInvoker(0))
+ .Run());
+ }
+
+ auto expectedTotalTimeEstimate = TDuration::Zero();
+ auto start = GetInstant();
+
+ for (int idx = 0; idx < ActionCount; ++idx) {
+ TDelayedExecutor::WaitForDuration(Quantum);
+
+ auto statistics = invokerPool->GetInvokerStatistics(0);
+ auto expectedTotalTime = GetInstant() - start;
+
+ if (idx == 0) {
+ expectedTotalTimeEstimate = expectedTotalTime;
+ } else {
+ expectedTotalTimeEstimate = (expectedTotalTimeEstimate * idx + expectedTotalTime) / (idx + 1.0);
+ }
+
+ EXPECT_EQ(statistics.WaitingActionCount, ActionCount - idx);
+
+ EXPECT_LE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate + Margin)
+ << TError("Index: %v.", idx).GetMessage();
+ EXPECT_GE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate - Margin)
+ << TError("Index: %v.", idx).GetMessage();
+
+ leashes[idx].NotifyOne();
+ WaitFor(std::move(actions[idx])).ThrowOnError();
+ }
+}
+
+TEST_F(TProfiledFairShareInvokerPoolTest, GetTotalWaitEstimateUncorrelatedWithOtherInvokers)
+{
+ auto executionOrderEnforcer = [] (int suggestedStep) {
+ static int realStep = 0;
+ EXPECT_EQ(realStep, suggestedStep);
+ ++realStep;
+ };
+ auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 2);
+ // Make aggregator never forget a sample.
+ invokerPool->UpdateActionTimeRelevancyHalflife(TDuration::Days(100000000000000000));
+
+ std::vector<NThreading::TEvent> leashes(2);
+ std::vector<TFuture<void>> actions;
+
+ for (int idx = 0; idx < 2; ++idx) {
+ actions.emplace_back(BIND([&executionOrderEnforcer, &leashes, idx] {
+ if (idx == 0) {
+ executionOrderEnforcer(0);
+ } else {
+ executionOrderEnforcer(2);
+ }
+ leashes[idx].Wait(100 * Quantum);
+ })
+ .AsyncVia(invokerPool->GetInvoker(0))
+ .Run());
+ }
+
+ NThreading::TEvent secondaryLeash;
+ auto secondaryAction = BIND([&executionOrderEnforcer, &secondaryLeash] {
+ executionOrderEnforcer(1);
+ secondaryLeash.Wait(100 * Quantum);
+ }).AsyncVia(invokerPool->GetInvoker(1)).Run();
+
+ auto start = GetInstant();
+
+ TDelayedExecutor::WaitForDuration(Quantum);
+
+ auto statistics = invokerPool->GetInvokerStatistics(0);
+ auto expectedTotalTimeEstimate = GetInstant() - start;
+
+ EXPECT_EQ(statistics.WaitingActionCount, 2);
+ EXPECT_LE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate + Margin);
+ EXPECT_GE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate - Margin);
+
+ leashes[0].NotifyOne();
+ WaitFor(std::move(actions[0])).ThrowOnError();
+
+ // Second action will not be executed until the secondary action is released.
+
+ leashes[1].NotifyOne();
+ TDelayedExecutor::WaitForDuration(10 * Quantum);
+ EXPECT_FALSE(actions[1].IsSet());
+
+ // Release Secondary action.
+
+ auto secondaryStatistics = invokerPool->GetInvokerStatistics(1);
+ auto secondaryWaitTime = GetInstant() - start;
+
+ EXPECT_EQ(secondaryStatistics.WaitingActionCount, 1);
+ EXPECT_LE(secondaryStatistics.TotalTimeEstimate, secondaryWaitTime + Margin);
+ EXPECT_GE(secondaryStatistics.TotalTimeEstimate, secondaryWaitTime - Margin);
+
+ secondaryLeash.NotifyOne();
+ WaitFor(std::move(secondaryAction)).ThrowOnError();
+ WaitFor(std::move(actions[1])).ThrowOnError();
+
+ statistics = invokerPool->GetInvokerStatistics(0);
+ expectedTotalTimeEstimate = (expectedTotalTimeEstimate + (GetInstant() - start)) / 3.0;
+
+ EXPECT_EQ(statistics.WaitingActionCount, 0);
+ EXPECT_LE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate + Margin);
+ EXPECT_GE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate - Margin);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// TL;DR:
+// 1) We make solomon exporter and profiled queue with custom registry so we can read counters.
+// 2) We enqueue a bunch of actions into each invoker and wait for them to complete.
+// 3) We read json from exporter, check that every sensor and tagset is present.
+// 4) We convert json to yson to list of sensor.
+// 5) We check that:
+// 5.1) There is exactly one tagless enqueue/dequeue sensor containing total number of actions in "value";
+// 5.2) There is exactly one enqueue/dequeue sensor with tag "thread":threadName
+// containing total number of actions in "value";
+// 5.3) There is exactly one enqueue/dequeue sensor for each tagset {"thread":threadName, "bucket":"invoker_i"}
+// containing i number of actions in "value" for i from 0 to totalInvokerCount.
+class TProfiledFairShareInvokerPoolProfilingTest
+ : public TProfiledFairShareInvokerPoolTest
+{
+public:
+ TSolomonExporterConfigPtr CreateExporterConfig()
+ {
+ auto config = New<TSolomonExporterConfig>();
+ config->GridStep = TDuration::Seconds(1);
+ config->EnableCoreProfilingCompatibility = true;
+ config->EnableSelfProfiling = false;
+
+ return config;
+ }
+
+ template <class F>
+ void SyncRunCallback(IInvokerPtr invoker, F&& func)
+ {
+ WaitFor(BIND(std::forward<F>(func)).AsyncVia(invoker).Run()).ThrowOnError();
+ }
+
+ auto GetSensors(TString json)
+ {
+ for (auto& c : json) {
+ if (c == ':') {
+ c = '=';
+ } else if (c == ',') {
+ c = ';';
+ }
+ }
+
+ auto yson = NYson::TYsonString(json);
+
+ auto list = NYTree::ConvertToNode(yson)->AsMap()->FindChild("sensors");
+
+ EXPECT_TRUE(list);
+
+ return list->AsList()->GetChildren();
+ }
+
+ // Implementation detail is that if sensor present
+ // for one tagset, it is present for all of them
+ // thus I can't be bothered verifying it here.
+ void VerifyCountersArePresent(int invokerCount, TString json)
+ {
+ TString sensorPrefix = "\"sensor\":\"yt.fair_share_invoker_pool";
+ TString bucketPrefix = "\"bucket\":\"invoker_";
+ TString threadName = "\"thread\":\"TestInvokerPool\"";
+
+ auto checker = [&] (const TString& pattern) {
+ EXPECT_TRUE(json.Contains(pattern))
+ << TError("Pattern %v is missing", pattern).GetMessage();
+ };
+
+ checker(sensorPrefix + ".size\"");
+ checker(sensorPrefix + ".dequeued\"");
+ checker(sensorPrefix + ".enqueued\"");
+ checker(sensorPrefix + ".time.wait.max\"");
+ checker(sensorPrefix + ".time.exec.max\"");
+ checker(sensorPrefix + ".time.cumulative\"");
+ checker(sensorPrefix + ".time.total.max\"");
+ checker(threadName);
+
+ for (int i = 0; i < invokerCount; ++i) {
+ checker(bucketPrefix + ToString(i) + "\"");
+ }
+ }
+
+ void VerifyJson(int invokerCount, TString json)
+ {
+ VerifyCountersArePresent(invokerCount, json);
+
+ THashMap<TString, int> invokerNameToEnqueued;
+ bool taglessEnqueuedPresent = false;
+ bool taglessDequeuedPresent = false;
+ bool threadOnlyTagEnqueuedPresent = false;
+ bool threadOnlyTagDequeuedPresent = false;
+
+ int totalActions = 0;
+
+ for (int i = 0; i < invokerCount; ++i) {
+ invokerNameToEnqueued.emplace("invoker_" + ToString(i), i);
+ totalActions += i;
+ }
+
+ THashMap<TString, int> invokerNameToDequeued = invokerNameToEnqueued;
+
+ for (const auto& entry : GetSensors(json)) {
+ auto mapEntry = entry->AsMap();
+ auto labels = mapEntry->FindChild("labels")->AsMap();
+
+ auto sensor = labels->FindChildValue<TString>("sensor");
+
+ if (!sensor ||
+ !(sensor == "yt.fair_share_invoker_pool.dequeued" ||
+ sensor == "yt.fair_share_invoker_pool.enqueued"))
+ {
+ continue;
+ }
+
+ auto value = mapEntry->FindChildValue<int>("value");
+ EXPECT_TRUE(value);
+
+ auto& invokerNameToValue =
+ sensor == "yt.fair_share_invoker_pool.enqueued" ?
+ invokerNameToEnqueued :
+ invokerNameToDequeued;
+
+ if (auto threadName = labels->FindChildValue<TString>("thread")) {
+ EXPECT_EQ(threadName, "TestInvokerPool");
+
+ if (auto bucketName = labels->FindChildValue<TString>("bucket")) {
+ EXPECT_TRUE(bucketName->StartsWith("invoker_"));
+
+ EXPECT_TRUE(invokerNameToValue.contains(*bucketName));
+ EXPECT_EQ(invokerNameToValue[*bucketName], *value);
+ invokerNameToValue.erase(*bucketName);
+
+ continue;
+ }
+
+ if (sensor == "yt.fair_share_invoker_pool.enqueued") {
+ EXPECT_FALSE(std::exchange(threadOnlyTagEnqueuedPresent, true));
+ } else {
+ EXPECT_FALSE(std::exchange(threadOnlyTagDequeuedPresent, true));
+ }
+ } else {
+ if (sensor == "yt.fair_share_invoker_pool.enqueued") {
+ EXPECT_FALSE(std::exchange(taglessEnqueuedPresent, true));
+ } else {
+ EXPECT_FALSE(std::exchange(taglessDequeuedPresent, true));
+ }
+ }
+
+ EXPECT_EQ(value, totalActions);
+ }
+
+ EXPECT_TRUE(threadOnlyTagEnqueuedPresent);
+ EXPECT_TRUE(threadOnlyTagDequeuedPresent);
+ EXPECT_TRUE(taglessEnqueuedPresent);
+ EXPECT_TRUE(taglessDequeuedPresent);
+
+ EXPECT_TRUE(invokerNameToEnqueued.empty());
+ EXPECT_TRUE(invokerNameToDequeued.empty());
+ }
+
+ void TestProfiler(int invokerCount)
+ {
+ auto registry = New<TSolomonRegistry>();
+ auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), invokerCount, registry);
+
+ auto config = CreateExporterConfig();
+ auto exporter = New<TSolomonExporter>(config, registry);
+
+ exporter->Start();
+
+ for (int invokerIdx = 0; invokerIdx < invokerCount; ++invokerIdx) {
+ for (int actionIndex = 0; actionIndex < invokerIdx; ++actionIndex) {
+ SyncRunCallback(invokerPool->GetInvoker(invokerIdx), []{});
+ }
+ }
+
+ Sleep(TDuration::Seconds(5));
+
+ auto json = exporter->ReadJson();
+ ASSERT_TRUE(json);
+
+ exporter->Stop();
+
+ VerifyJson(invokerCount, std::move(*json));
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST_F(TProfiledFairShareInvokerPoolProfilingTest, TestProfilerGenericFairShareInvokerPool1)
+{
+ TestProfiler(1);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolProfilingTest, TestProfilerGenericFairShareInvokerPool2)
+{
+ TestProfiler(2);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolProfilingTest, TestProfilerGenericFairShareInvokerPool4)
+{
+ TestProfiler(4);
+}
+
+TEST_F(TProfiledFairShareInvokerPoolProfilingTest, TestProfilerGenericFairShareInvokerPool10)
+{
+ TestProfiler(10);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+DEFINE_ENUM(EInvokerBuckets,
+ ((Apple) (1))
+ ((Pear) (2))
+ ((Orange) (3))
+ ((Watermelon) (4))
+);
+
+TEST_F(TProfiledFairShareInvokerPoolProfilingTest, TestEnumIndexedProfilerGenericFairShareInvokerPool)
+{
+ auto registry = New<TSolomonRegistry>();
+ auto invokerPool = CreateEnumIndexedProfiledFairShareInvokerPool<EInvokerBuckets>(
+ Queues_[0]->GetInvoker(),
+ [this] (int bucketCount) {
+ YT_VERIFY(bucketCount > 0);
+ MockCallbackQueue = New<TMockFairShareCallbackQueue>(bucketCount);
+ return MockCallbackQueue;
+ },
+ TAdjustedExponentialMovingAverage::DefaultHalflife,
+ "TestInvokerPool",
+ registry);
+
+ auto config = CreateExporterConfig();
+ auto exporter = New<TSolomonExporter>(config, registry);
+
+ exporter->Start();
+
+ for (int invokerIdx = 0; invokerIdx < TEnumTraits<EInvokerBuckets>::GetDomainSize(); ++invokerIdx) {
+ SyncRunCallback(invokerPool->GetInvoker(invokerIdx), []{});
+ }
+
+ Sleep(TDuration::Seconds(5));
+
+ auto json = exporter->ReadJson();
+ ASSERT_TRUE(json);
+
+ exporter->Stop();
+
+ TEnumIndexedArray<EInvokerBuckets, bool> mentions;
+ std::ranges::fill(mentions, false);
+
+ for (const auto& sensor : GetSensors(*json)) {
+ auto labels = sensor->AsMap()->FindChild("labels")->AsMap();
+
+ if (auto bucketName = labels->FindChildValue<TString>("bucket")) {
+ mentions[TEnumTraits<EInvokerBuckets>::FromString(*bucketName)] = true;
+
+ if (auto sensorName = labels->FindChildValue<TString>("sensor");
+ sensorName &&
+ sensorName == "yt.fair_share_invoker_pool.dequeued")
+ {
+ auto value = sensor->AsMap()->FindChildValue<int>("value");
+
+ EXPECT_TRUE(value);
+ EXPECT_EQ(*value, 1);
+ }
+ }
+ }
+
+ EXPECT_TRUE(std::ranges::all_of(mentions, std::identity{}));
+}
+
+} // namespace
+} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/unittests/ya.make b/yt/yt/core/concurrency/unittests/ya.make
index d1d86adfe5..b8cad1eafd 100644
--- a/yt/yt/core/concurrency/unittests/ya.make
+++ b/yt/yt/core/concurrency/unittests/ya.make
@@ -26,6 +26,7 @@ SRCS(
nonblocking_batcher_ut.cpp
nonblocking_queue_ut.cpp
periodic_ut.cpp
+ profiled_fair_share_invoker_pool_ut.cpp
propagating_storage_ut.cpp
quantized_executor_ut.cpp
scheduled_executor_ut.cpp