aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-02-29 07:31:23 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-02-29 07:39:42 +0300
commit153135af2652773157f98b91e31119d8f3672447 (patch)
tree5454f846246ba682c5ece63f9f6daa01aaac08e2
parent784c58b378e628a93382de0e56323e780e51fbe5 (diff)
downloadydb-153135af2652773157f98b91e31119d8f3672447.tar.gz
Intermediate changes
-rw-r--r--contrib/python/httpcore/.dist-info/METADATA12
-rw-r--r--contrib/python/httpcore/httpcore/__init__.py2
-rw-r--r--contrib/python/httpcore/httpcore/_async/connection.py18
-rw-r--r--contrib/python/httpcore/httpcore/_async/connection_pool.py364
-rw-r--r--contrib/python/httpcore/httpcore/_async/http11.py3
-rw-r--r--contrib/python/httpcore/httpcore/_async/socks_proxy.py4
-rw-r--r--contrib/python/httpcore/httpcore/_sync/connection.py18
-rw-r--r--contrib/python/httpcore/httpcore/_sync/connection_pool.py364
-rw-r--r--contrib/python/httpcore/httpcore/_sync/http11.py3
-rw-r--r--contrib/python/httpcore/httpcore/_synchronization.py58
-rw-r--r--contrib/python/httpcore/ya.make2
-rw-r--r--contrib/python/hypothesis/py3/.dist-info/METADATA6
-rw-r--r--contrib/python/hypothesis/py3/hypothesis/strategies/_internal/utils.py14
-rw-r--r--contrib/python/hypothesis/py3/hypothesis/version.py2
-rw-r--r--contrib/python/hypothesis/py3/ya.make2
15 files changed, 476 insertions, 396 deletions
diff --git a/contrib/python/httpcore/.dist-info/METADATA b/contrib/python/httpcore/.dist-info/METADATA
index 07eab9de21..51de714c58 100644
--- a/contrib/python/httpcore/.dist-info/METADATA
+++ b/contrib/python/httpcore/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: httpcore
-Version: 1.0.2
+Version: 1.0.3
Summary: A minimal low-level HTTP client.
Project-URL: Documentation, https://www.encode.io/httpcore
Project-URL: Homepage, https://www.encode.io/httpcore/
@@ -33,7 +33,7 @@ Requires-Dist: h2<5,>=3; extra == 'http2'
Provides-Extra: socks
Requires-Dist: socksio==1.*; extra == 'socks'
Provides-Extra: trio
-Requires-Dist: trio<0.23.0,>=0.22.0; extra == 'trio'
+Requires-Dist: trio<0.24.0,>=0.22.0; extra == 'trio'
Description-Content-Type: text/markdown
# HTTP Core
@@ -153,7 +153,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
-## 1.0.2 (November 10th, 2023)
+## 1.0.3 (February 13th, 2024)
+
+- Fix support for async cancellations. (#880)
+- Fix trace extension when used with socks proxy. (#849)
+- Fix SSL context for connections using the "wss" scheme (#869)
+
+## 1.0.2 (November 10th, 2023)
- Fix `float("inf")` timeouts in `Event.wait` function. (#846)
diff --git a/contrib/python/httpcore/httpcore/__init__.py b/contrib/python/httpcore/httpcore/__init__.py
index eb3e577186..3709fc4080 100644
--- a/contrib/python/httpcore/httpcore/__init__.py
+++ b/contrib/python/httpcore/httpcore/__init__.py
@@ -130,7 +130,7 @@ __all__ = [
"WriteError",
]
-__version__ = "1.0.2"
+__version__ = "1.0.3"
__locals = locals()
diff --git a/contrib/python/httpcore/httpcore/_async/connection.py b/contrib/python/httpcore/httpcore/_async/connection.py
index 45ee22a63d..2f439cf09c 100644
--- a/contrib/python/httpcore/httpcore/_async/connection.py
+++ b/contrib/python/httpcore/httpcore/_async/connection.py
@@ -6,7 +6,7 @@ from typing import Iterable, Iterator, Optional, Type
from .._backends.auto import AutoBackend
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream
-from .._exceptions import ConnectError, ConnectionNotAvailable, ConnectTimeout
+from .._exceptions import ConnectError, ConnectTimeout
from .._models import Origin, Request, Response
from .._ssl import default_ssl_context
from .._synchronization import AsyncLock
@@ -70,9 +70,9 @@ class AsyncHTTPConnection(AsyncConnectionInterface):
f"Attempted to send request to {request.url.origin} on connection to {self._origin}"
)
- async with self._request_lock:
- if self._connection is None:
- try:
+ try:
+ async with self._request_lock:
+ if self._connection is None:
stream = await self._connect(request)
ssl_object = stream.get_extra_info("ssl_object")
@@ -94,11 +94,9 @@ class AsyncHTTPConnection(AsyncConnectionInterface):
stream=stream,
keepalive_expiry=self._keepalive_expiry,
)
- except Exception as exc:
- self._connect_failed = True
- raise exc
- elif not self._connection.is_available():
- raise ConnectionNotAvailable()
+ except BaseException as exc:
+ self._connect_failed = True
+ raise exc
return await self._connection.handle_async_request(request)
@@ -137,7 +135,7 @@ class AsyncHTTPConnection(AsyncConnectionInterface):
)
trace.return_value = stream
- if self._origin.scheme == b"https":
+ if self._origin.scheme in (b"https", b"wss"):
ssl_context = (
default_ssl_context()
if self._ssl_context is None
diff --git a/contrib/python/httpcore/httpcore/_async/connection_pool.py b/contrib/python/httpcore/httpcore/_async/connection_pool.py
index 0320c6d80e..018b0ba234 100644
--- a/contrib/python/httpcore/httpcore/_async/connection_pool.py
+++ b/contrib/python/httpcore/httpcore/_async/connection_pool.py
@@ -1,31 +1,30 @@
import ssl
import sys
-import time
from types import TracebackType
from typing import AsyncIterable, AsyncIterator, Iterable, List, Optional, Type
from .._backends.auto import AutoBackend
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
-from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol
+from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
-from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation
+from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock
from .connection import AsyncHTTPConnection
from .interfaces import AsyncConnectionInterface, AsyncRequestInterface
-class RequestStatus:
- def __init__(self, request: Request):
+class AsyncPoolRequest:
+ def __init__(self, request: Request) -> None:
self.request = request
self.connection: Optional[AsyncConnectionInterface] = None
self._connection_acquired = AsyncEvent()
- def set_connection(self, connection: AsyncConnectionInterface) -> None:
- assert self.connection is None
+ def assign_to_connection(
+ self, connection: Optional[AsyncConnectionInterface]
+ ) -> None:
self.connection = connection
self._connection_acquired.set()
- def unset_connection(self) -> None:
- assert self.connection is not None
+ def clear_connection(self) -> None:
self.connection = None
self._connection_acquired = AsyncEvent()
@@ -37,6 +36,9 @@ class RequestStatus:
assert self.connection is not None
return self.connection
+ def is_queued(self) -> bool:
+ return self.connection is None
+
class AsyncConnectionPool(AsyncRequestInterface):
"""
@@ -107,14 +109,21 @@ class AsyncConnectionPool(AsyncRequestInterface):
self._local_address = local_address
self._uds = uds
- self._pool: List[AsyncConnectionInterface] = []
- self._requests: List[RequestStatus] = []
- self._pool_lock = AsyncLock()
self._network_backend = (
AutoBackend() if network_backend is None else network_backend
)
self._socket_options = socket_options
+ # The mutable state on a connection pool is the queue of incoming requests,
+ # and the set of connections that are servicing those requests.
+ self._connections: List[AsyncConnectionInterface] = []
+ self._requests: List[AsyncPoolRequest] = []
+
+ # We only mutate the state of the connection pool within an 'optional_thread_lock'
+ # context. This holds a threading lock unless we're running in async mode,
+ # in which case it is a no-op.
+ self._optional_thread_lock = AsyncThreadLock()
+
def create_connection(self, origin: Origin) -> AsyncConnectionInterface:
return AsyncHTTPConnection(
origin=origin,
@@ -145,64 +154,7 @@ class AsyncConnectionPool(AsyncRequestInterface):
]
```
"""
- return list(self._pool)
-
- async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
- """
- Attempt to provide a connection that can handle the given origin.
- """
- origin = status.request.url.origin
-
- # If there are queued requests in front of us, then don't acquire a
- # connection. We handle requests strictly in order.
- waiting = [s for s in self._requests if s.connection is None]
- if waiting and waiting[0] is not status:
- return False
-
- # Reuse an existing connection if one is currently available.
- for idx, connection in enumerate(self._pool):
- if connection.can_handle_request(origin) and connection.is_available():
- self._pool.pop(idx)
- self._pool.insert(0, connection)
- status.set_connection(connection)
- return True
-
- # If the pool is currently full, attempt to close one idle connection.
- if len(self._pool) >= self._max_connections:
- for idx, connection in reversed(list(enumerate(self._pool))):
- if connection.is_idle():
- await connection.aclose()
- self._pool.pop(idx)
- break
-
- # If the pool is still full, then we cannot acquire a connection.
- if len(self._pool) >= self._max_connections:
- return False
-
- # Otherwise create a new connection.
- connection = self.create_connection(origin)
- self._pool.insert(0, connection)
- status.set_connection(connection)
- return True
-
- async def _close_expired_connections(self) -> None:
- """
- Clean up the connection pool by closing off any connections that have expired.
- """
- # Close any connections that have expired their keep-alive time.
- for idx, connection in reversed(list(enumerate(self._pool))):
- if connection.has_expired():
- await connection.aclose()
- self._pool.pop(idx)
-
- # If the pool size exceeds the maximum number of allowed keep-alive connections,
- # then close off idle connections as required.
- pool_size = len(self._pool)
- for idx, connection in reversed(list(enumerate(self._pool))):
- if connection.is_idle() and pool_size > self._max_keepalive_connections:
- await connection.aclose()
- self._pool.pop(idx)
- pool_size -= 1
+ return list(self._connections)
async def handle_async_request(self, request: Request) -> Response:
"""
@@ -220,116 +172,147 @@ class AsyncConnectionPool(AsyncRequestInterface):
f"Request URL has an unsupported protocol '{scheme}://'."
)
- status = RequestStatus(request)
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("pool", None)
- if timeout is not None:
- deadline = time.monotonic() + timeout
- else:
- deadline = float("inf")
-
- async with self._pool_lock:
- self._requests.append(status)
- await self._close_expired_connections()
- await self._attempt_to_acquire_connection(status)
-
- while True:
- try:
- connection = await status.wait_for_connection(timeout=timeout)
- except BaseException as exc:
- # If we timeout here, or if the task is cancelled, then make
- # sure to remove the request from the queue before bubbling
- # up the exception.
- async with self._pool_lock:
- # Ensure only remove when task exists.
- if status in self._requests:
- self._requests.remove(status)
- raise exc
-
- try:
- response = await connection.handle_async_request(request)
- except ConnectionNotAvailable:
- # The ConnectionNotAvailable exception is a special case, that
- # indicates we need to retry the request on a new connection.
- #
- # The most common case where this can occur is when multiple
- # requests are queued waiting for a single connection, which
- # might end up as an HTTP/2 connection, but which actually ends
- # up as HTTP/1.1.
- async with self._pool_lock:
- # Maintain our position in the request queue, but reset the
- # status so that the request becomes queued again.
- status.unset_connection()
- await self._attempt_to_acquire_connection(status)
- except BaseException as exc:
- with AsyncShieldCancellation():
- await self.response_closed(status)
- raise exc
- else:
- break
-
- timeout = deadline - time.monotonic()
- if timeout < 0:
- raise PoolTimeout # pragma: nocover
-
- # When we return the response, we wrap the stream in a special class
- # that handles notifying the connection pool once the response
- # has been released.
+ with self._optional_thread_lock:
+ # Add the incoming request to our request queue.
+ pool_request = AsyncPoolRequest(request)
+ self._requests.append(pool_request)
+
+ try:
+ while True:
+ with self._optional_thread_lock:
+ # Assign incoming requests to available connections,
+ # closing or creating new connections as required.
+ closing = self._assign_requests_to_connections()
+ await self._close_connections(closing)
+
+ # Wait until this request has an assigned connection.
+ connection = await pool_request.wait_for_connection(timeout=timeout)
+
+ try:
+ # Send the request on the assigned connection.
+ response = await connection.handle_async_request(
+ pool_request.request
+ )
+ except ConnectionNotAvailable:
+ # In some cases a connection may initially be available to
+ # handle a request, but then become unavailable.
+ #
+ # In this case we clear the connection and try again.
+ pool_request.clear_connection()
+ else:
+ break # pragma: nocover
+
+ except BaseException as exc:
+ with self._optional_thread_lock:
+ # For any exception or cancellation we remove the request from
+ # the queue, and then re-assign requests to connections.
+ self._requests.remove(pool_request)
+ closing = self._assign_requests_to_connections()
+
+ await self._close_connections(closing)
+ raise exc from None
+
+ # Return the response. Note that in this case we still have to manage
+ # the point at which the response is closed.
assert isinstance(response.stream, AsyncIterable)
return Response(
status=response.status,
headers=response.headers,
- content=ConnectionPoolByteStream(response.stream, self, status),
+ content=PoolByteStream(
+ stream=response.stream, pool_request=pool_request, pool=self
+ ),
extensions=response.extensions,
)
- async def response_closed(self, status: RequestStatus) -> None:
+ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
"""
- This method acts as a callback once the request/response cycle is complete.
+ Manage the state of the connection pool, assigning incoming
+ requests to connections as available.
- It is called into from the `ConnectionPoolByteStream.aclose()` method.
- """
- assert status.connection is not None
- connection = status.connection
-
- async with self._pool_lock:
- # Update the state of the connection pool.
- if status in self._requests:
- self._requests.remove(status)
-
- if connection.is_closed() and connection in self._pool:
- self._pool.remove(connection)
-
- # Since we've had a response closed, it's possible we'll now be able
- # to service one or more requests that are currently pending.
- for status in self._requests:
- if status.connection is None:
- acquired = await self._attempt_to_acquire_connection(status)
- # If we could not acquire a connection for a queued request
- # then we don't need to check anymore requests that are
- # queued later behind it.
- if not acquired:
- break
-
- # Housekeeping.
- await self._close_expired_connections()
+ Called whenever a new request is added or removed from the pool.
- async def aclose(self) -> None:
+ Any closing connections are returned, allowing the I/O for closing
+ those connections to be handled seperately.
"""
- Close any connections in the pool.
- """
- async with self._pool_lock:
- for connection in self._pool:
+ closing_connections = []
+
+ # First we handle cleaning up any connections that are closed,
+ # have expired their keep-alive, or surplus idle connections.
+ for connection in list(self._connections):
+ if connection.is_closed():
+ # log: "removing closed connection"
+ self._connections.remove(connection)
+ elif connection.has_expired():
+ # log: "closing expired connection"
+ self._connections.remove(connection)
+ closing_connections.append(connection)
+ elif (
+ connection.is_idle()
+ and len([connection.is_idle() for connection in self._connections])
+ > self._max_keepalive_connections
+ ):
+ # log: "closing idle connection"
+ self._connections.remove(connection)
+ closing_connections.append(connection)
+
+ # Assign queued requests to connections.
+ queued_requests = [request for request in self._requests if request.is_queued()]
+ for pool_request in queued_requests:
+ origin = pool_request.request.url.origin
+ avilable_connections = [
+ connection
+ for connection in self._connections
+ if connection.can_handle_request(origin) and connection.is_available()
+ ]
+ idle_connections = [
+ connection for connection in self._connections if connection.is_idle()
+ ]
+
+ # There are three cases for how we may be able to handle the request:
+ #
+ # 1. There is an existing connection that can handle the request.
+ # 2. We can create a new connection to handle the request.
+ # 3. We can close an idle connection and then create a new connection
+ # to handle the request.
+ if avilable_connections:
+ # log: "reusing existing connection"
+ connection = avilable_connections[0]
+ pool_request.assign_to_connection(connection)
+ elif len(self._connections) < self._max_connections:
+ # log: "creating new connection"
+ connection = self.create_connection(origin)
+ self._connections.append(connection)
+ pool_request.assign_to_connection(connection)
+ elif idle_connections:
+ # log: "closing idle connection"
+ connection = idle_connections[0]
+ self._connections.remove(connection)
+ closing_connections.append(connection)
+ # log: "creating new connection"
+ connection = self.create_connection(origin)
+ self._connections.append(connection)
+ pool_request.assign_to_connection(connection)
+
+ return closing_connections
+
+ async def _close_connections(self, closing: List[AsyncConnectionInterface]) -> None:
+ # Close connections which have been removed from the pool.
+ with AsyncShieldCancellation():
+ for connection in closing:
await connection.aclose()
- self._pool = []
- self._requests = []
+
+ async def aclose(self) -> None:
+ # Explicitly close the connection pool.
+ # Clears all existing requests and connections.
+ with self._optional_thread_lock:
+ closing_connections = list(self._connections)
+ self._connections = []
+ await self._close_connections(closing_connections)
async def __aenter__(self) -> "AsyncConnectionPool":
- # Acquiring the pool lock here ensures that we have the
- # correct dependencies installed as early as possible.
- async with self._pool_lock:
- pass
return self
async def __aexit__(
@@ -340,31 +323,58 @@ class AsyncConnectionPool(AsyncRequestInterface):
) -> None:
await self.aclose()
+ def __repr__(self) -> str:
+ class_name = self.__class__.__name__
+ with self._optional_thread_lock:
+ request_is_queued = [request.is_queued() for request in self._requests]
+ connection_is_idle = [
+ connection.is_idle() for connection in self._connections
+ ]
+
+ num_active_requests = request_is_queued.count(False)
+ num_queued_requests = request_is_queued.count(True)
+ num_active_connections = connection_is_idle.count(False)
+ num_idle_connections = connection_is_idle.count(True)
+
+ requests_info = (
+ f"Requests: {num_active_requests} active, {num_queued_requests} queued"
+ )
+ connection_info = (
+ f"Connections: {num_active_connections} active, {num_idle_connections} idle"
+ )
+
+ return f"<{class_name} [{requests_info} | {connection_info}]>"
-class ConnectionPoolByteStream:
- """
- A wrapper around the response byte stream, that additionally handles
- notifying the connection pool when the response has been closed.
- """
+class PoolByteStream:
def __init__(
self,
stream: AsyncIterable[bytes],
+ pool_request: AsyncPoolRequest,
pool: AsyncConnectionPool,
- status: RequestStatus,
) -> None:
self._stream = stream
+ self._pool_request = pool_request
self._pool = pool
- self._status = status
+ self._closed = False
async def __aiter__(self) -> AsyncIterator[bytes]:
- async for part in self._stream:
- yield part
+ try:
+ async for part in self._stream:
+ yield part
+ except BaseException as exc:
+ await self.aclose()
+ raise exc from None
async def aclose(self) -> None:
- try:
- if hasattr(self._stream, "aclose"):
- await self._stream.aclose()
- finally:
+ if not self._closed:
+ self._closed = True
with AsyncShieldCancellation():
- await self._pool.response_closed(self._status)
+ if hasattr(self._stream, "aclose"):
+ await self._stream.aclose()
+
+ with self._pool._optional_thread_lock:
+ self._pool._requests.remove(self._pool_request)
+ closing = self._pool._assign_requests_to_connections()
+
+ await self._pool._close_connections(closing)
diff --git a/contrib/python/httpcore/httpcore/_async/http11.py b/contrib/python/httpcore/httpcore/_async/http11.py
index 32fa3a6f23..a5eb480840 100644
--- a/contrib/python/httpcore/httpcore/_async/http11.py
+++ b/contrib/python/httpcore/httpcore/_async/http11.py
@@ -10,7 +10,6 @@ from typing import (
Tuple,
Type,
Union,
- cast,
)
import h11
@@ -228,7 +227,7 @@ class AsyncHTTP11Connection(AsyncConnectionInterface):
self._h11_state.receive_data(data)
else:
# mypy fails to narrow the type in the above if statement above
- return cast(Union[h11.Event, Type[h11.PAUSED]], event)
+ return event # type: ignore[return-value]
async def _response_closed(self) -> None:
async with self._state_lock:
diff --git a/contrib/python/httpcore/httpcore/_async/socks_proxy.py b/contrib/python/httpcore/httpcore/_async/socks_proxy.py
index 08a065d6d1..f839603fe5 100644
--- a/contrib/python/httpcore/httpcore/_async/socks_proxy.py
+++ b/contrib/python/httpcore/httpcore/_async/socks_proxy.py
@@ -228,7 +228,7 @@ class AsyncSocks5Connection(AsyncConnectionInterface):
"port": self._proxy_origin.port,
"timeout": timeout,
}
- with Trace("connect_tcp", logger, request, kwargs) as trace:
+ async with Trace("connect_tcp", logger, request, kwargs) as trace:
stream = await self._network_backend.connect_tcp(**kwargs)
trace.return_value = stream
@@ -239,7 +239,7 @@ class AsyncSocks5Connection(AsyncConnectionInterface):
"port": self._remote_origin.port,
"auth": self._proxy_auth,
}
- with Trace(
+ async with Trace(
"setup_socks5_connection", logger, request, kwargs
) as trace:
await _init_socks5_connection(**kwargs)
diff --git a/contrib/python/httpcore/httpcore/_sync/connection.py b/contrib/python/httpcore/httpcore/_sync/connection.py
index 81e4172a21..c3890f340c 100644
--- a/contrib/python/httpcore/httpcore/_sync/connection.py
+++ b/contrib/python/httpcore/httpcore/_sync/connection.py
@@ -6,7 +6,7 @@ from typing import Iterable, Iterator, Optional, Type
from .._backends.sync import SyncBackend
from .._backends.base import SOCKET_OPTION, NetworkBackend, NetworkStream
-from .._exceptions import ConnectError, ConnectionNotAvailable, ConnectTimeout
+from .._exceptions import ConnectError, ConnectTimeout
from .._models import Origin, Request, Response
from .._ssl import default_ssl_context
from .._synchronization import Lock
@@ -70,9 +70,9 @@ class HTTPConnection(ConnectionInterface):
f"Attempted to send request to {request.url.origin} on connection to {self._origin}"
)
- with self._request_lock:
- if self._connection is None:
- try:
+ try:
+ with self._request_lock:
+ if self._connection is None:
stream = self._connect(request)
ssl_object = stream.get_extra_info("ssl_object")
@@ -94,11 +94,9 @@ class HTTPConnection(ConnectionInterface):
stream=stream,
keepalive_expiry=self._keepalive_expiry,
)
- except Exception as exc:
- self._connect_failed = True
- raise exc
- elif not self._connection.is_available():
- raise ConnectionNotAvailable()
+ except BaseException as exc:
+ self._connect_failed = True
+ raise exc
return self._connection.handle_request(request)
@@ -137,7 +135,7 @@ class HTTPConnection(ConnectionInterface):
)
trace.return_value = stream
- if self._origin.scheme == b"https":
+ if self._origin.scheme in (b"https", b"wss"):
ssl_context = (
default_ssl_context()
if self._ssl_context is None
diff --git a/contrib/python/httpcore/httpcore/_sync/connection_pool.py b/contrib/python/httpcore/httpcore/_sync/connection_pool.py
index ccfb8d2220..8dcf348cac 100644
--- a/contrib/python/httpcore/httpcore/_sync/connection_pool.py
+++ b/contrib/python/httpcore/httpcore/_sync/connection_pool.py
@@ -1,31 +1,30 @@
import ssl
import sys
-import time
from types import TracebackType
from typing import Iterable, Iterator, Iterable, List, Optional, Type
from .._backends.sync import SyncBackend
from .._backends.base import SOCKET_OPTION, NetworkBackend
-from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol
+from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
-from .._synchronization import Event, Lock, ShieldCancellation
+from .._synchronization import Event, ShieldCancellation, ThreadLock
from .connection import HTTPConnection
from .interfaces import ConnectionInterface, RequestInterface
-class RequestStatus:
- def __init__(self, request: Request):
+class PoolRequest:
+ def __init__(self, request: Request) -> None:
self.request = request
self.connection: Optional[ConnectionInterface] = None
self._connection_acquired = Event()
- def set_connection(self, connection: ConnectionInterface) -> None:
- assert self.connection is None
+ def assign_to_connection(
+ self, connection: Optional[ConnectionInterface]
+ ) -> None:
self.connection = connection
self._connection_acquired.set()
- def unset_connection(self) -> None:
- assert self.connection is not None
+ def clear_connection(self) -> None:
self.connection = None
self._connection_acquired = Event()
@@ -37,6 +36,9 @@ class RequestStatus:
assert self.connection is not None
return self.connection
+ def is_queued(self) -> bool:
+ return self.connection is None
+
class ConnectionPool(RequestInterface):
"""
@@ -107,14 +109,21 @@ class ConnectionPool(RequestInterface):
self._local_address = local_address
self._uds = uds
- self._pool: List[ConnectionInterface] = []
- self._requests: List[RequestStatus] = []
- self._pool_lock = Lock()
self._network_backend = (
SyncBackend() if network_backend is None else network_backend
)
self._socket_options = socket_options
+ # The mutable state on a connection pool is the queue of incoming requests,
+ # and the set of connections that are servicing those requests.
+ self._connections: List[ConnectionInterface] = []
+ self._requests: List[PoolRequest] = []
+
+ # We only mutate the state of the connection pool within an 'optional_thread_lock'
+ # context. This holds a threading lock unless we're running in async mode,
+ # in which case it is a no-op.
+ self._optional_thread_lock = ThreadLock()
+
def create_connection(self, origin: Origin) -> ConnectionInterface:
return HTTPConnection(
origin=origin,
@@ -145,64 +154,7 @@ class ConnectionPool(RequestInterface):
]
```
"""
- return list(self._pool)
-
- def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
- """
- Attempt to provide a connection that can handle the given origin.
- """
- origin = status.request.url.origin
-
- # If there are queued requests in front of us, then don't acquire a
- # connection. We handle requests strictly in order.
- waiting = [s for s in self._requests if s.connection is None]
- if waiting and waiting[0] is not status:
- return False
-
- # Reuse an existing connection if one is currently available.
- for idx, connection in enumerate(self._pool):
- if connection.can_handle_request(origin) and connection.is_available():
- self._pool.pop(idx)
- self._pool.insert(0, connection)
- status.set_connection(connection)
- return True
-
- # If the pool is currently full, attempt to close one idle connection.
- if len(self._pool) >= self._max_connections:
- for idx, connection in reversed(list(enumerate(self._pool))):
- if connection.is_idle():
- connection.close()
- self._pool.pop(idx)
- break
-
- # If the pool is still full, then we cannot acquire a connection.
- if len(self._pool) >= self._max_connections:
- return False
-
- # Otherwise create a new connection.
- connection = self.create_connection(origin)
- self._pool.insert(0, connection)
- status.set_connection(connection)
- return True
-
- def _close_expired_connections(self) -> None:
- """
- Clean up the connection pool by closing off any connections that have expired.
- """
- # Close any connections that have expired their keep-alive time.
- for idx, connection in reversed(list(enumerate(self._pool))):
- if connection.has_expired():
- connection.close()
- self._pool.pop(idx)
-
- # If the pool size exceeds the maximum number of allowed keep-alive connections,
- # then close off idle connections as required.
- pool_size = len(self._pool)
- for idx, connection in reversed(list(enumerate(self._pool))):
- if connection.is_idle() and pool_size > self._max_keepalive_connections:
- connection.close()
- self._pool.pop(idx)
- pool_size -= 1
+ return list(self._connections)
def handle_request(self, request: Request) -> Response:
"""
@@ -220,116 +172,147 @@ class ConnectionPool(RequestInterface):
f"Request URL has an unsupported protocol '{scheme}://'."
)
- status = RequestStatus(request)
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("pool", None)
- if timeout is not None:
- deadline = time.monotonic() + timeout
- else:
- deadline = float("inf")
-
- with self._pool_lock:
- self._requests.append(status)
- self._close_expired_connections()
- self._attempt_to_acquire_connection(status)
-
- while True:
- try:
- connection = status.wait_for_connection(timeout=timeout)
- except BaseException as exc:
- # If we timeout here, or if the task is cancelled, then make
- # sure to remove the request from the queue before bubbling
- # up the exception.
- with self._pool_lock:
- # Ensure only remove when task exists.
- if status in self._requests:
- self._requests.remove(status)
- raise exc
-
- try:
- response = connection.handle_request(request)
- except ConnectionNotAvailable:
- # The ConnectionNotAvailable exception is a special case, that
- # indicates we need to retry the request on a new connection.
- #
- # The most common case where this can occur is when multiple
- # requests are queued waiting for a single connection, which
- # might end up as an HTTP/2 connection, but which actually ends
- # up as HTTP/1.1.
- with self._pool_lock:
- # Maintain our position in the request queue, but reset the
- # status so that the request becomes queued again.
- status.unset_connection()
- self._attempt_to_acquire_connection(status)
- except BaseException as exc:
- with ShieldCancellation():
- self.response_closed(status)
- raise exc
- else:
- break
-
- timeout = deadline - time.monotonic()
- if timeout < 0:
- raise PoolTimeout # pragma: nocover
-
- # When we return the response, we wrap the stream in a special class
- # that handles notifying the connection pool once the response
- # has been released.
+ with self._optional_thread_lock:
+ # Add the incoming request to our request queue.
+ pool_request = PoolRequest(request)
+ self._requests.append(pool_request)
+
+ try:
+ while True:
+ with self._optional_thread_lock:
+ # Assign incoming requests to available connections,
+ # closing or creating new connections as required.
+ closing = self._assign_requests_to_connections()
+ self._close_connections(closing)
+
+ # Wait until this request has an assigned connection.
+ connection = pool_request.wait_for_connection(timeout=timeout)
+
+ try:
+ # Send the request on the assigned connection.
+ response = connection.handle_request(
+ pool_request.request
+ )
+ except ConnectionNotAvailable:
+ # In some cases a connection may initially be available to
+ # handle a request, but then become unavailable.
+ #
+ # In this case we clear the connection and try again.
+ pool_request.clear_connection()
+ else:
+ break # pragma: nocover
+
+ except BaseException as exc:
+ with self._optional_thread_lock:
+ # For any exception or cancellation we remove the request from
+ # the queue, and then re-assign requests to connections.
+ self._requests.remove(pool_request)
+ closing = self._assign_requests_to_connections()
+
+ self._close_connections(closing)
+ raise exc from None
+
+ # Return the response. Note that in this case we still have to manage
+ # the point at which the response is closed.
assert isinstance(response.stream, Iterable)
return Response(
status=response.status,
headers=response.headers,
- content=ConnectionPoolByteStream(response.stream, self, status),
+ content=PoolByteStream(
+ stream=response.stream, pool_request=pool_request, pool=self
+ ),
extensions=response.extensions,
)
- def response_closed(self, status: RequestStatus) -> None:
+ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
"""
- This method acts as a callback once the request/response cycle is complete.
+ Manage the state of the connection pool, assigning incoming
+ requests to connections as available.
- It is called into from the `ConnectionPoolByteStream.close()` method.
- """
- assert status.connection is not None
- connection = status.connection
-
- with self._pool_lock:
- # Update the state of the connection pool.
- if status in self._requests:
- self._requests.remove(status)
-
- if connection.is_closed() and connection in self._pool:
- self._pool.remove(connection)
-
- # Since we've had a response closed, it's possible we'll now be able
- # to service one or more requests that are currently pending.
- for status in self._requests:
- if status.connection is None:
- acquired = self._attempt_to_acquire_connection(status)
- # If we could not acquire a connection for a queued request
- # then we don't need to check anymore requests that are
- # queued later behind it.
- if not acquired:
- break
-
- # Housekeeping.
- self._close_expired_connections()
+ Called whenever a new request is added or removed from the pool.
- def close(self) -> None:
+ Any closing connections are returned, allowing the I/O for closing
+ those connections to be handled seperately.
"""
- Close any connections in the pool.
- """
- with self._pool_lock:
- for connection in self._pool:
+ closing_connections = []
+
+ # First we handle cleaning up any connections that are closed,
+ # have expired their keep-alive, or surplus idle connections.
+ for connection in list(self._connections):
+ if connection.is_closed():
+ # log: "removing closed connection"
+ self._connections.remove(connection)
+ elif connection.has_expired():
+ # log: "closing expired connection"
+ self._connections.remove(connection)
+ closing_connections.append(connection)
+ elif (
+ connection.is_idle()
+ and len([connection.is_idle() for connection in self._connections])
+ > self._max_keepalive_connections
+ ):
+ # log: "closing idle connection"
+ self._connections.remove(connection)
+ closing_connections.append(connection)
+
+ # Assign queued requests to connections.
+ queued_requests = [request for request in self._requests if request.is_queued()]
+ for pool_request in queued_requests:
+ origin = pool_request.request.url.origin
+ avilable_connections = [
+ connection
+ for connection in self._connections
+ if connection.can_handle_request(origin) and connection.is_available()
+ ]
+ idle_connections = [
+ connection for connection in self._connections if connection.is_idle()
+ ]
+
+ # There are three cases for how we may be able to handle the request:
+ #
+ # 1. There is an existing connection that can handle the request.
+ # 2. We can create a new connection to handle the request.
+ # 3. We can close an idle connection and then create a new connection
+ # to handle the request.
+ if avilable_connections:
+ # log: "reusing existing connection"
+ connection = avilable_connections[0]
+ pool_request.assign_to_connection(connection)
+ elif len(self._connections) < self._max_connections:
+ # log: "creating new connection"
+ connection = self.create_connection(origin)
+ self._connections.append(connection)
+ pool_request.assign_to_connection(connection)
+ elif idle_connections:
+ # log: "closing idle connection"
+ connection = idle_connections[0]
+ self._connections.remove(connection)
+ closing_connections.append(connection)
+ # log: "creating new connection"
+ connection = self.create_connection(origin)
+ self._connections.append(connection)
+ pool_request.assign_to_connection(connection)
+
+ return closing_connections
+
+ def _close_connections(self, closing: List[ConnectionInterface]) -> None:
+ # Close connections which have been removed from the pool.
+ with ShieldCancellation():
+ for connection in closing:
connection.close()
- self._pool = []
- self._requests = []
+
+ def close(self) -> None:
+ # Explicitly close the connection pool.
+ # Clears all existing requests and connections.
+ with self._optional_thread_lock:
+ closing_connections = list(self._connections)
+ self._connections = []
+ self._close_connections(closing_connections)
def __enter__(self) -> "ConnectionPool":
- # Acquiring the pool lock here ensures that we have the
- # correct dependencies installed as early as possible.
- with self._pool_lock:
- pass
return self
def __exit__(
@@ -340,31 +323,58 @@ class ConnectionPool(RequestInterface):
) -> None:
self.close()
+ def __repr__(self) -> str:
+ class_name = self.__class__.__name__
+ with self._optional_thread_lock:
+ request_is_queued = [request.is_queued() for request in self._requests]
+ connection_is_idle = [
+ connection.is_idle() for connection in self._connections
+ ]
+
+ num_active_requests = request_is_queued.count(False)
+ num_queued_requests = request_is_queued.count(True)
+ num_active_connections = connection_is_idle.count(False)
+ num_idle_connections = connection_is_idle.count(True)
+
+ requests_info = (
+ f"Requests: {num_active_requests} active, {num_queued_requests} queued"
+ )
+ connection_info = (
+ f"Connections: {num_active_connections} active, {num_idle_connections} idle"
+ )
+
+ return f"<{class_name} [{requests_info} | {connection_info}]>"
-class ConnectionPoolByteStream:
- """
- A wrapper around the response byte stream, that additionally handles
- notifying the connection pool when the response has been closed.
- """
+class PoolByteStream:
def __init__(
self,
stream: Iterable[bytes],
+ pool_request: PoolRequest,
pool: ConnectionPool,
- status: RequestStatus,
) -> None:
self._stream = stream
+ self._pool_request = pool_request
self._pool = pool
- self._status = status
+ self._closed = False
def __iter__(self) -> Iterator[bytes]:
- for part in self._stream:
- yield part
+ try:
+ for part in self._stream:
+ yield part
+ except BaseException as exc:
+ self.close()
+ raise exc from None
def close(self) -> None:
- try:
- if hasattr(self._stream, "close"):
- self._stream.close()
- finally:
+ if not self._closed:
+ self._closed = True
with ShieldCancellation():
- self._pool.response_closed(self._status)
+ if hasattr(self._stream, "close"):
+ self._stream.close()
+
+ with self._pool._optional_thread_lock:
+ self._pool._requests.remove(self._pool_request)
+ closing = self._pool._assign_requests_to_connections()
+
+ self._pool._close_connections(closing)
diff --git a/contrib/python/httpcore/httpcore/_sync/http11.py b/contrib/python/httpcore/httpcore/_sync/http11.py
index 0cc100e3ff..e108f88b12 100644
--- a/contrib/python/httpcore/httpcore/_sync/http11.py
+++ b/contrib/python/httpcore/httpcore/_sync/http11.py
@@ -10,7 +10,6 @@ from typing import (
Tuple,
Type,
Union,
- cast,
)
import h11
@@ -228,7 +227,7 @@ class HTTP11Connection(ConnectionInterface):
self._h11_state.receive_data(data)
else:
# mypy fails to narrow the type in the above if statement above
- return cast(Union[h11.Event, Type[h11.PAUSED]], event)
+ return event # type: ignore[return-value]
def _response_closed(self) -> None:
with self._state_lock:
diff --git a/contrib/python/httpcore/httpcore/_synchronization.py b/contrib/python/httpcore/httpcore/_synchronization.py
index 119d89fc0d..9619a39835 100644
--- a/contrib/python/httpcore/httpcore/_synchronization.py
+++ b/contrib/python/httpcore/httpcore/_synchronization.py
@@ -45,6 +45,13 @@ def current_async_library() -> str:
class AsyncLock:
+ """
+ This is a standard lock.
+
+ In the sync case `Lock` provides thread locking.
+ In the async case `AsyncLock` provides async locking.
+ """
+
def __init__(self) -> None:
self._backend = ""
@@ -82,6 +89,26 @@ class AsyncLock:
self._anyio_lock.release()
+class AsyncThreadLock:
+ """
+ This is a threading-only lock for no-I/O contexts.
+
+ In the sync case `ThreadLock` provides thread locking.
+ In the async case `AsyncThreadLock` is a no-op.
+ """
+
+ def __enter__(self) -> "AsyncThreadLock":
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]] = None,
+ exc_value: Optional[BaseException] = None,
+ traceback: Optional[TracebackType] = None,
+ ) -> None:
+ pass
+
+
class AsyncEvent:
def __init__(self) -> None:
self._backend = ""
@@ -202,6 +229,13 @@ class AsyncShieldCancellation:
class Lock:
+ """
+ This is a standard lock.
+
+ In the sync case `Lock` provides thread locking.
+ In the async case `AsyncLock` provides async locking.
+ """
+
def __init__(self) -> None:
self._lock = threading.Lock()
@@ -218,6 +252,30 @@ class Lock:
self._lock.release()
+class ThreadLock:
+ """
+ This is a threading-only lock for no-I/O contexts.
+
+ In the sync case `ThreadLock` provides thread locking.
+ In the async case `AsyncThreadLock` is a no-op.
+ """
+
+ def __init__(self) -> None:
+ self._lock = threading.Lock()
+
+ def __enter__(self) -> "ThreadLock":
+ self._lock.acquire()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]] = None,
+ exc_value: Optional[BaseException] = None,
+ traceback: Optional[TracebackType] = None,
+ ) -> None:
+ self._lock.release()
+
+
class Event:
def __init__(self) -> None:
self._event = threading.Event()
diff --git a/contrib/python/httpcore/ya.make b/contrib/python/httpcore/ya.make
index de7a3ac5a2..e8408ed893 100644
--- a/contrib/python/httpcore/ya.make
+++ b/contrib/python/httpcore/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(1.0.2)
+VERSION(1.0.3)
LICENSE(BSD-3-Clause)
diff --git a/contrib/python/hypothesis/py3/.dist-info/METADATA b/contrib/python/hypothesis/py3/.dist-info/METADATA
index 0b48218e7a..c2000db5a1 100644
--- a/contrib/python/hypothesis/py3/.dist-info/METADATA
+++ b/contrib/python/hypothesis/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: hypothesis
-Version: 6.98.4
+Version: 6.98.5
Summary: A library for property-based testing
Home-page: https://hypothesis.works
Author: David R. MacIver and Zac Hatfield-Dodds
@@ -53,7 +53,7 @@ Requires-Dist: pytz >=2014.1 ; extra == 'all'
Requires-Dist: redis >=3.0.0 ; extra == 'all'
Requires-Dist: rich >=9.0.0 ; extra == 'all'
Requires-Dist: backports.zoneinfo >=0.2.1 ; (python_version < "3.9") and extra == 'all'
-Requires-Dist: tzdata >=2023.4 ; (sys_platform == "win32" or sys_platform == "emscripten") and extra == 'all'
+Requires-Dist: tzdata >=2024.1 ; (sys_platform == "win32" or sys_platform == "emscripten") and extra == 'all'
Provides-Extra: cli
Requires-Dist: click >=7.0 ; extra == 'cli'
Requires-Dist: black >=19.10b0 ; extra == 'cli'
@@ -82,7 +82,7 @@ Provides-Extra: redis
Requires-Dist: redis >=3.0.0 ; extra == 'redis'
Provides-Extra: zoneinfo
Requires-Dist: backports.zoneinfo >=0.2.1 ; (python_version < "3.9") and extra == 'zoneinfo'
-Requires-Dist: tzdata >=2023.4 ; (sys_platform == "win32" or sys_platform == "emscripten") and extra == 'zoneinfo'
+Requires-Dist: tzdata >=2024.1 ; (sys_platform == "win32" or sys_platform == "emscripten") and extra == 'zoneinfo'
==========
Hypothesis
diff --git a/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/utils.py b/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/utils.py
index 7bf0514d44..066c2b6581 100644
--- a/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/utils.py
+++ b/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/utils.py
@@ -176,6 +176,14 @@ def to_jsonable(obj: object) -> object:
for k, v in obj.items()
}
+ # Hey, might as well try calling a .to_json() method - it works for Pandas!
+ # We try this before the below general-purpose handlers to give folks a
+ # chance to control this behavior on their custom classes.
+ try:
+ return to_jsonable(obj.to_json()) # type: ignore
+ except Exception:
+ pass
+
# Special handling for dataclasses, attrs, and pydantic classes
if (
(dcs := sys.modules.get("dataclasses"))
@@ -188,11 +196,5 @@ def to_jsonable(obj: object) -> object:
if (pyd := sys.modules.get("pydantic")) and isinstance(obj, pyd.BaseModel):
return to_jsonable(obj.model_dump())
- # Hey, might as well try calling a .to_json() method - it works for Pandas!
- try:
- return to_jsonable(obj.to_json()) # type: ignore
- except Exception:
- pass
-
# If all else fails, we'll just pretty-print as a string.
return pretty(obj)
diff --git a/contrib/python/hypothesis/py3/hypothesis/version.py b/contrib/python/hypothesis/py3/hypothesis/version.py
index 8a33e96d3d..65284f2d57 100644
--- a/contrib/python/hypothesis/py3/hypothesis/version.py
+++ b/contrib/python/hypothesis/py3/hypothesis/version.py
@@ -8,5 +8,5 @@
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.
-__version_info__ = (6, 98, 4)
+__version_info__ = (6, 98, 5)
__version__ = ".".join(map(str, __version_info__))
diff --git a/contrib/python/hypothesis/py3/ya.make b/contrib/python/hypothesis/py3/ya.make
index bd45605d1c..c07c5f7974 100644
--- a/contrib/python/hypothesis/py3/ya.make
+++ b/contrib/python/hypothesis/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(6.98.4)
+VERSION(6.98.5)
LICENSE(MPL-2.0)