summaryrefslogtreecommitdiffstats
path: root/contrib/python/httpcore
diff options
context:
space:
mode:
authormpereskokova <[email protected]>2025-02-05 20:01:29 +0300
committermpereskokova <[email protected]>2025-02-05 20:40:36 +0300
commit53c94da1fd856e0a2a445bb594b8da9ee02eaacb (patch)
tree9682f60801ab103e293e042512ace23444c1521a /contrib/python/httpcore
parentc9aba105985ed020cf7faab86131c62ca2e342d8 (diff)
Move libyqlplugin.so to yt
Приедет в ytsaurus: <HIDDEN_URL> Приедет в `/contrib/ydb/` <HIDDEN_URL> Проверка, что в github ydb ничего не сломается: <https://github.com/ydb-platform/ydb/pull/13286> commit_hash:73ab1b4a3245322afc9fc6e9d71424ad07106477
Diffstat (limited to 'contrib/python/httpcore')
-rw-r--r--contrib/python/httpcore/.dist-info/METADATA616
-rw-r--r--contrib/python/httpcore/.dist-info/top_level.txt4
-rw-r--r--contrib/python/httpcore/LICENSE.md27
-rw-r--r--contrib/python/httpcore/README.md111
-rw-r--r--contrib/python/httpcore/httpcore/__init__.py140
-rw-r--r--contrib/python/httpcore/httpcore/_api.py94
-rw-r--r--contrib/python/httpcore/httpcore/_async/__init__.py39
-rw-r--r--contrib/python/httpcore/httpcore/_async/connection.py222
-rw-r--r--contrib/python/httpcore/httpcore/_async/connection_pool.py420
-rw-r--r--contrib/python/httpcore/httpcore/_async/http11.py379
-rw-r--r--contrib/python/httpcore/httpcore/_async/http2.py583
-rw-r--r--contrib/python/httpcore/httpcore/_async/http_proxy.py367
-rw-r--r--contrib/python/httpcore/httpcore/_async/interfaces.py137
-rw-r--r--contrib/python/httpcore/httpcore/_async/socks_proxy.py341
-rw-r--r--contrib/python/httpcore/httpcore/_backends/__init__.py0
-rw-r--r--contrib/python/httpcore/httpcore/_backends/anyio.py146
-rw-r--r--contrib/python/httpcore/httpcore/_backends/auto.py52
-rw-r--r--contrib/python/httpcore/httpcore/_backends/base.py101
-rw-r--r--contrib/python/httpcore/httpcore/_backends/mock.py143
-rw-r--r--contrib/python/httpcore/httpcore/_backends/sync.py241
-rw-r--r--contrib/python/httpcore/httpcore/_backends/trio.py159
-rw-r--r--contrib/python/httpcore/httpcore/_exceptions.py81
-rw-r--r--contrib/python/httpcore/httpcore/_models.py516
-rw-r--r--contrib/python/httpcore/httpcore/_ssl.py9
-rw-r--r--contrib/python/httpcore/httpcore/_sync/__init__.py39
-rw-r--r--contrib/python/httpcore/httpcore/_sync/connection.py222
-rw-r--r--contrib/python/httpcore/httpcore/_sync/connection_pool.py420
-rw-r--r--contrib/python/httpcore/httpcore/_sync/http11.py379
-rw-r--r--contrib/python/httpcore/httpcore/_sync/http2.py583
-rw-r--r--contrib/python/httpcore/httpcore/_sync/http_proxy.py367
-rw-r--r--contrib/python/httpcore/httpcore/_sync/interfaces.py137
-rw-r--r--contrib/python/httpcore/httpcore/_sync/socks_proxy.py341
-rw-r--r--contrib/python/httpcore/httpcore/_synchronization.py318
-rw-r--r--contrib/python/httpcore/httpcore/_trace.py107
-rw-r--r--contrib/python/httpcore/httpcore/_utils.py37
-rw-r--r--contrib/python/httpcore/httpcore/py.typed0
-rw-r--r--contrib/python/httpcore/ya.make66
37 files changed, 0 insertions, 7944 deletions
diff --git a/contrib/python/httpcore/.dist-info/METADATA b/contrib/python/httpcore/.dist-info/METADATA
deleted file mode 100644
index 99be2236cdd..00000000000
--- a/contrib/python/httpcore/.dist-info/METADATA
+++ /dev/null
@@ -1,616 +0,0 @@
-Metadata-Version: 2.3
-Name: httpcore
-Version: 1.0.7
-Summary: A minimal low-level HTTP client.
-Project-URL: Documentation, https://www.encode.io/httpcore
-Project-URL: Homepage, https://www.encode.io/httpcore/
-Project-URL: Source, https://github.com/encode/httpcore
-Author-email: Tom Christie <[email protected]>
-License: BSD-3-Clause
-Classifier: Development Status :: 3 - Alpha
-Classifier: Environment :: Web Environment
-Classifier: Framework :: AsyncIO
-Classifier: Framework :: Trio
-Classifier: Intended Audience :: Developers
-Classifier: License :: OSI Approved :: BSD License
-Classifier: Operating System :: OS Independent
-Classifier: Programming Language :: Python :: 3
-Classifier: Programming Language :: Python :: 3 :: Only
-Classifier: Programming Language :: Python :: 3.8
-Classifier: Programming Language :: Python :: 3.9
-Classifier: Programming Language :: Python :: 3.10
-Classifier: Programming Language :: Python :: 3.11
-Classifier: Programming Language :: Python :: 3.12
-Classifier: Topic :: Internet :: WWW/HTTP
-Requires-Python: >=3.8
-Requires-Dist: certifi
-Requires-Dist: h11<0.15,>=0.13
-Provides-Extra: asyncio
-Requires-Dist: anyio<5.0,>=4.0; extra == 'asyncio'
-Provides-Extra: http2
-Requires-Dist: h2<5,>=3; extra == 'http2'
-Provides-Extra: socks
-Requires-Dist: socksio==1.*; extra == 'socks'
-Provides-Extra: trio
-Requires-Dist: trio<1.0,>=0.22.0; extra == 'trio'
-Description-Content-Type: text/markdown
-
-# HTTP Core
-
-[![Test Suite](https://github.com/encode/httpcore/workflows/Test%20Suite/badge.svg)](https://github.com/encode/httpcore/actions)
-[![Package version](https://badge.fury.io/py/httpcore.svg)](https://pypi.org/project/httpcore/)
-
-> *Do one thing, and do it well.*
-
-The HTTP Core package provides a minimal low-level HTTP client, which does
-one thing only. Sending HTTP requests.
-
-It does not provide any high level model abstractions over the API,
-does not handle redirects, multipart uploads, building authentication headers,
-transparent HTTP caching, URL parsing, session cookie handling,
-content or charset decoding, handling JSON, environment based configuration
-defaults, or any of that Jazz.
-
-Some things HTTP Core does do:
-
-* Sending HTTP requests.
-* Thread-safe / task-safe connection pooling.
-* HTTP(S) proxy & SOCKS proxy support.
-* Supports HTTP/1.1 and HTTP/2.
-* Provides both sync and async interfaces.
-* Async backend support for `asyncio` and `trio`.
-
-## Requirements
-
-Python 3.8+
-
-## Installation
-
-For HTTP/1.1 only support, install with:
-
-```shell
-$ pip install httpcore
-```
-
-There are also a number of optional extras available...
-
-```shell
-$ pip install httpcore['asyncio,trio,http2,socks']
-```
-
-## Sending requests
-
-Send an HTTP request:
-
-```python
-import httpcore
-
-response = httpcore.request("GET", "https://www.example.com/")
-
-print(response)
-# <Response [200]>
-print(response.status)
-# 200
-print(response.headers)
-# [(b'Accept-Ranges', b'bytes'), (b'Age', b'557328'), (b'Cache-Control', b'max-age=604800'), ...]
-print(response.content)
-# b'<!doctype html>\n<html>\n<head>\n<title>Example Domain</title>\n\n<meta charset="utf-8"/>\n ...'
-```
-
-The top-level `httpcore.request()` function is provided for convenience. In practice whenever you're working with `httpcore` you'll want to use the connection pooling functionality that it provides.
-
-```python
-import httpcore
-
-http = httpcore.ConnectionPool()
-response = http.request("GET", "https://www.example.com/")
-```
-
-Once you're ready to get going, [head over to the documentation](https://www.encode.io/httpcore/).
-
-## Motivation
-
-You *probably* don't want to be using HTTP Core directly. It might make sense if
-you're writing something like a proxy service in Python, and you just want
-something at the lowest possible level, but more typically you'll want to use
-a higher level client library, such as `httpx`.
-
-The motivation for `httpcore` is:
-
-* To provide a reusable low-level client library, that other packages can then build on top of.
-* To provide a *really clear interface split* between the networking code and client logic,
- so that each is easier to understand and reason about in isolation.
-
-## Dependencies
-
-The `httpcore` package has the following dependencies...
-
-* `h11`
-* `certifi`
-
-And the following optional extras...
-
-* `anyio` - Required by `pip install httpcore['asyncio']`.
-* `trio` - Required by `pip install httpcore['trio']`.
-* `h2` - Required by `pip install httpcore['http2']`.
-* `socksio` - Required by `pip install httpcore['socks']`.
-
-## Versioning
-
-We use [SEMVER for our versioning policy](https://semver.org/).
-
-For changes between package versions please see our [project changelog](CHANGELOG.md).
-
-We recommend pinning your requirements either the most current major version, or a more specific version range:
-
-```python
-pip install 'httpcore==1.*'
-```
-# Changelog
-
-All notable changes to this project will be documented in this file.
-
-The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
-
-## Version 1.0.7 (November 15th, 2024)
-
-- Support `proxy=…` configuration on `ConnectionPool()`. (#974)
-
-## Version 1.0.6 (October 1st, 2024)
-
-- Relax `trio` dependency pinning. (#956)
-- Handle `trio` raising `NotImplementedError` on unsupported platforms. (#955)
-- Handle mapping `ssl.SSLError` to `httpcore.ConnectError`. (#918)
-
-## 1.0.5 (March 27th, 2024)
-
-- Handle `EndOfStream` exception for anyio backend. (#899)
-- Allow trio `0.25.*` series in package dependancies. (#903)
-
-## 1.0.4 (February 21st, 2024)
-
-- Add `target` request extension. (#888)
-- Fix support for connection `Upgrade` and `CONNECT` when some data in the stream has been read. (#882)
-
-## 1.0.3 (February 13th, 2024)
-
-- Fix support for async cancellations. (#880)
-- Fix trace extension when used with socks proxy. (#849)
-- Fix SSL context for connections using the "wss" scheme (#869)
-
-## 1.0.2 (November 10th, 2023)
-
-- Fix `float("inf")` timeouts in `Event.wait` function. (#846)
-
-## 1.0.1 (November 3rd, 2023)
-
-- Fix pool timeout to account for the total time spent retrying. (#823)
-- Raise a neater RuntimeError when the correct async deps are not installed. (#826)
-- Add support for synchronous TLS-in-TLS streams. (#840)
-
-## 1.0.0 (October 6th, 2023)
-
-From version 1.0 our async support is now optional, as the package has minimal dependencies by default.
-
-For async support use either `pip install 'httpcore[asyncio]'` or `pip install 'httpcore[trio]'`.
-
-The project versioning policy is now explicitly governed by SEMVER. See https://semver.org/.
-
-- Async support becomes fully optional. (#809)
-- Add support for Python 3.12. (#807)
-
-## 0.18.0 (September 8th, 2023)
-
-- Add support for HTTPS proxies. (#745, #786)
-- Drop Python 3.7 support. (#727)
-- Handle `sni_hostname` extension with SOCKS proxy. (#774)
-- Handle HTTP/1.1 half-closed connections gracefully. (#641)
-- Change the type of `Extensions` from `Mapping[Str, Any]` to `MutableMapping[Str, Any]`. (#762)
-
-## 0.17.3 (July 5th, 2023)
-
-- Support async cancellations, ensuring that the connection pool is left in a clean state when cancellations occur. (#726)
-- The networking backend interface has [been added to the public API](https://www.encode.io/httpcore/network-backends). Some classes which were previously private implementation detail are now part of the top-level public API. (#699)
-- Graceful handling of HTTP/2 GoAway frames, with requests being transparently retried on a new connection. (#730)
-- Add exceptions when a synchronous `trace callback` is passed to an asynchronous request or an asynchronous `trace callback` is passed to a synchronous request. (#717)
-- Drop Python 3.7 support. (#727)
-
-## 0.17.2 (May 23th, 2023)
-
-- Add `socket_options` argument to `ConnectionPool` and `HTTProxy` classes. (#668)
-- Improve logging with per-module logger names. (#690)
-- Add `sni_hostname` request extension. (#696)
-- Resolve race condition during import of `anyio` package. (#692)
-- Enable TCP_NODELAY for all synchronous sockets. (#651)
-
-## 0.17.1 (May 17th, 2023)
-
-- If 'retries' is set, then allow retries if an SSL handshake error occurs. (#669)
-- Improve correctness of tracebacks on network exceptions, by raising properly chained exceptions. (#678)
-- Prevent connection-hanging behaviour when HTTP/2 connections are closed by a server-sent 'GoAway' frame. (#679)
-- Fix edge-case exception when removing requests from the connection pool. (#680)
-- Fix pool timeout edge-case. (#688)
-
-## 0.17.0 (March 16th, 2023)
-
-- Add DEBUG level logging. (#648)
-- Respect HTTP/2 max concurrent streams when settings updates are sent by server. (#652)
-- Increase the allowable HTTP header size to 100kB. (#647)
-- Add `retries` option to SOCKS proxy classes. (#643)
-
-## 0.16.3 (December 20th, 2022)
-
-- Allow `ws` and `wss` schemes. Allows us to properly support websocket upgrade connections. (#625)
-- Forwarding HTTP proxies use a connection-per-remote-host. Required by some proxy implementations. (#637)
-- Don't raise `RuntimeError` when closing a connection pool with active connections. Removes some error cases when cancellations are used. (#631)
-- Lazy import `anyio`, so that it's no longer a hard dependancy, and isn't imported if unused. (#639)
-
-## 0.16.2 (November 25th, 2022)
-
-- Revert 'Fix async cancellation behaviour', which introduced race conditions. (#627)
-- Raise `RuntimeError` if attempting to us UNIX domain sockets on Windows. (#619)
-
-## 0.16.1 (November 17th, 2022)
-
-- Fix HTTP/1.1 interim informational responses, such as "100 Continue". (#605)
-
-## 0.16.0 (October 11th, 2022)
-
-- Support HTTP/1.1 informational responses. (#581)
-- Fix async cancellation behaviour. (#580)
-- Support `h11` 0.14. (#579)
-
-## 0.15.0 (May 17th, 2022)
-
-- Drop Python 3.6 support (#535)
-- Ensure HTTP proxy CONNECT requests include `timeout` configuration. (#506)
-- Switch to explicit `typing.Optional` for type hints. (#513)
-- For `trio` map OSError exceptions to `ConnectError`. (#543)
-
-## 0.14.7 (February 4th, 2022)
-
-- Requests which raise a PoolTimeout need to be removed from the pool queue. (#502)
-- Fix AttributeError that happened when Socks5Connection were terminated. (#501)
-
-## 0.14.6 (February 1st, 2022)
-
-- Fix SOCKS support for `http://` URLs. (#492)
-- Resolve race condition around exceptions during streaming a response. (#491)
-
-## 0.14.5 (January 18th, 2022)
-
-- SOCKS proxy support. (#478)
-- Add proxy_auth argument to HTTPProxy. (#481)
-- Improve error message on 'RemoteProtocolError' exception when server disconnects without sending a response. (#479)
-
-## 0.14.4 (January 5th, 2022)
-
-- Support HTTP/2 on HTTPS tunnelling proxies. (#468)
-- Fix proxy headers missing on HTTP forwarding. (#456)
-- Only instantiate SSL context if required. (#457)
-- More robust HTTP/2 handling. (#253, #439, #440, #441)
-
-## 0.14.3 (November 17th, 2021)
-
-- Fix race condition when removing closed connections from the pool. (#437)
-
-## 0.14.2 (November 16th, 2021)
-
-- Failed connections no longer remain in the pool. (Pull #433)
-
-## 0.14.1 (November 12th, 2021)
-
-- `max_connections` becomes optional. (Pull #429)
-- `certifi` is now included in the install dependancies. (Pull #428)
-- `h2` is now strictly optional. (Pull #428)
-
-## 0.14.0 (November 11th, 2021)
-
-The 0.14 release is a complete reworking of `httpcore`, comprehensively addressing some underlying issues in the connection pooling, as well as substantially redesigning the API to be more user friendly.
-
-Some of the lower-level API design also makes the components more easily testable in isolation, and the package now has 100% test coverage.
-
-See [discussion #419](https://github.com/encode/httpcore/discussions/419) for a little more background.
-
-There's some other neat bits in there too, such as the "trace" extension, which gives a hook into inspecting the internal events that occur during the request/response cycle. This extension is needed for the HTTPX cli, in order to...
-
-* Log the point at which the connection is established, and the IP/port on which it is made.
-* Determine if the outgoing request should log as HTTP/1.1 or HTTP/2, rather than having to assume it's HTTP/2 if the --http2 flag was passed. (Which may not actually be true.)
-* Log SSL version info / certificate info.
-
-Note that `curio` support is not currently available in 0.14.0. If you're using `httpcore` with `curio` please get in touch, so we can assess if we ought to prioritize it as a feature or not.
-
-## 0.13.7 (September 13th, 2021)
-
-- Fix broken error messaging when URL scheme is missing, or a non HTTP(S) scheme is used. (Pull #403)
-
-## 0.13.6 (June 15th, 2021)
-
-### Fixed
-
-- Close sockets when read or write timeouts occur. (Pull #365)
-
-## 0.13.5 (June 14th, 2021)
-
-### Fixed
-
-- Resolved niggles with AnyIO EOF behaviours. (Pull #358, #362)
-
-## 0.13.4 (June 9th, 2021)
-
-### Added
-
-- Improved error messaging when URL scheme is missing, or a non HTTP(S) scheme is used. (Pull #354)
-
-### Fixed
-
-- Switched to `anyio` as the default backend implementation when running with `asyncio`. Resolves some awkward [TLS timeout issues](https://github.com/encode/httpx/discussions/1511).
-
-## 0.13.3 (May 6th, 2021)
-
-### Added
-
-- Support HTTP/2 prior knowledge, using `httpcore.SyncConnectionPool(http1=False)`. (Pull #333)
-
-### Fixed
-
-- Handle cases where environment does not provide `select.poll` support. (Pull #331)
-
-## 0.13.2 (April 29th, 2021)
-
-### Added
-
-- Improve error message for specific case of `RemoteProtocolError` where server disconnects without sending a response. (Pull #313)
-
-## 0.13.1 (April 28th, 2021)
-
-### Fixed
-
-- More resiliant testing for closed connections. (Pull #311)
-- Don't raise exceptions on ungraceful connection closes. (Pull #310)
-
-## 0.13.0 (April 21st, 2021)
-
-The 0.13 release updates the core API in order to match the HTTPX Transport API,
-introduced in HTTPX 0.18 onwards.
-
-An example of making requests with the new interface is:
-
-```python
-with httpcore.SyncConnectionPool() as http:
- status_code, headers, stream, extensions = http.handle_request(
- method=b'GET',
- url=(b'https', b'example.org', 443, b'/'),
- headers=[(b'host', b'example.org'), (b'user-agent', b'httpcore')]
- stream=httpcore.ByteStream(b''),
- extensions={}
- )
- body = stream.read()
- print(status_code, body)
-```
-
-### Changed
-
-- The `.request()` method is now `handle_request()`. (Pull #296)
-- The `.arequest()` method is now `.handle_async_request()`. (Pull #296)
-- The `headers` argument is no longer optional. (Pull #296)
-- The `stream` argument is no longer optional. (Pull #296)
-- The `ext` argument is now named `extensions`, and is no longer optional. (Pull #296)
-- The `"reason"` extension keyword is now named `"reason_phrase"`. (Pull #296)
-- The `"reason_phrase"` and `"http_version"` extensions now use byte strings for their values. (Pull #296)
-- The `httpcore.PlainByteStream()` class becomes `httpcore.ByteStream()`. (Pull #296)
-
-### Added
-
-- Streams now support a `.read()` interface. (Pull #296)
-
-### Fixed
-
-- Task cancellation no longer leaks connections from the connection pool. (Pull #305)
-
-## 0.12.3 (December 7th, 2020)
-
-### Fixed
-
-- Abort SSL connections on close rather than waiting for remote EOF when using `asyncio`. (Pull #167)
-- Fix exception raised in case of connect timeouts when using the `anyio` backend. (Pull #236)
-- Fix `Host` header precedence for `:authority` in HTTP/2. (Pull #241, #243)
-- Handle extra edge case when detecting for socket readability when using `asyncio`. (Pull #242, #244)
-- Fix `asyncio` SSL warning when using proxy tunneling. (Pull #249)
-
-## 0.12.2 (November 20th, 2020)
-
-### Fixed
-
-- Properly wrap connect errors on the asyncio backend. (Pull #235)
-- Fix `ImportError` occurring on Python 3.9 when using the HTTP/1.1 sync client in a multithreaded context. (Pull #237)
-
-## 0.12.1 (November 7th, 2020)
-
-### Added
-
-- Add connect retries. (Pull #221)
-
-### Fixed
-
-- Tweak detection of dropped connections, resolving an issue with open files limits on Linux. (Pull #185)
-- Avoid leaking connections when establishing an HTTP tunnel to a proxy has failed. (Pull #223)
-- Properly wrap OS errors when using `trio`. (Pull #225)
-
-## 0.12.0 (October 6th, 2020)
-
-### Changed
-
-- HTTP header casing is now preserved, rather than always sent in lowercase. (#216 and python-hyper/h11#104)
-
-### Added
-
-- Add Python 3.9 to officially supported versions.
-
-### Fixed
-
-- Gracefully handle a stdlib asyncio bug when a connection is closed while it is in a paused-for-reading state. (#201)
-
-## 0.11.1 (September 28nd, 2020)
-
-### Fixed
-
-- Add await to async semaphore release() coroutine (#197)
-- Drop incorrect curio classifier (#192)
-
-## 0.11.0 (September 22nd, 2020)
-
-The Transport API with 0.11.0 has a couple of significant changes.
-
-Firstly we've moved changed the request interface in order to allow extensions, which will later enable us to support features
-such as trailing headers, HTTP/2 server push, and CONNECT/Upgrade connections.
-
-The interface changes from:
-
-```python
-def request(method, url, headers, stream, timeout):
- return (http_version, status_code, reason, headers, stream)
-```
-
-To instead including an optional dictionary of extensions on the request and response:
-
-```python
-def request(method, url, headers, stream, ext):
- return (status_code, headers, stream, ext)
-```
-
-Having an open-ended extensions point will allow us to add later support for various optional features, that wouldn't otherwise be supported without these API changes.
-
-In particular:
-
-* Trailing headers support.
-* HTTP/2 Server Push
-* sendfile.
-* Exposing raw connection on CONNECT, Upgrade, HTTP/2 bi-di streaming.
-* Exposing debug information out of the API, including template name, template context.
-
-Currently extensions are limited to:
-
-* request: `timeout` - Optional. Timeout dictionary.
-* response: `http_version` - Optional. Include the HTTP version used on the response.
-* response: `reason` - Optional. Include the reason phrase used on the response. Only valid with HTTP/1.*.
-
-See https://github.com/encode/httpx/issues/1274#issuecomment-694884553 for the history behind this.
-
-Secondly, the async version of `request` is now namespaced as `arequest`.
-
-This allows concrete transports to support both sync and async implementations on the same class.
-
-### Added
-
-- Add curio support. (Pull #168)
-- Add anyio support, with `backend="anyio"`. (Pull #169)
-
-### Changed
-
-- Update the Transport API to use 'ext' for optional extensions. (Pull #190)
-- Update the Transport API to use `.request` and `.arequest` so implementations can support both sync and async. (Pull #189)
-
-## 0.10.2 (August 20th, 2020)
-
-### Added
-
-- Added Unix Domain Socket support. (Pull #139)
-
-### Fixed
-
-- Always include the port on proxy CONNECT requests. (Pull #154)
-- Fix `max_keepalive_connections` configuration. (Pull #153)
-- Fixes behaviour in HTTP/1.1 where server disconnects can be used to signal the end of the response body. (Pull #164)
-
-## 0.10.1 (August 7th, 2020)
-
-- Include `max_keepalive_connections` on `AsyncHTTPProxy`/`SyncHTTPProxy` classes.
-
-## 0.10.0 (August 7th, 2020)
-
-The most notable change in the 0.10.0 release is that HTTP/2 support is now fully optional.
-
-Use either `pip install httpcore` for HTTP/1.1 support only, or `pip install httpcore[http2]` for HTTP/1.1 and HTTP/2 support.
-
-### Added
-
-- HTTP/2 support becomes optional. (Pull #121, #130)
-- Add `local_address=...` support. (Pull #100, #134)
-- Add `PlainByteStream`, `IteratorByteStream`, `AsyncIteratorByteStream`. The `AsyncByteSteam` and `SyncByteStream` classes are now pure interface classes. (#133)
-- Add `LocalProtocolError`, `RemoteProtocolError` exceptions. (Pull #129)
-- Add `UnsupportedProtocol` exception. (Pull #128)
-- Add `.get_connection_info()` method. (Pull #102, #137)
-- Add better TRACE logs. (Pull #101)
-
-### Changed
-
-- `max_keepalive` is deprecated in favour of `max_keepalive_connections`. (Pull #140)
-
-### Fixed
-
-- Improve handling of server disconnects. (Pull #112)
-
-## 0.9.1 (May 27th, 2020)
-
-### Fixed
-
-- Proper host resolution for sync case, including IPv6 support. (Pull #97)
-- Close outstanding connections when connection pool is closed. (Pull #98)
-
-## 0.9.0 (May 21th, 2020)
-
-### Changed
-
-- URL port becomes an `Optional[int]` instead of `int`. (Pull #92)
-
-### Fixed
-
-- Honor HTTP/2 max concurrent streams settings. (Pull #89, #90)
-- Remove incorrect debug log. (Pull #83)
-
-## 0.8.4 (May 11th, 2020)
-
-### Added
-
-- Logging via HTTPCORE_LOG_LEVEL and HTTPX_LOG_LEVEL environment variables
-and TRACE level logging. (Pull #79)
-
-### Fixed
-
-- Reuse of connections on HTTP/2 in close concurrency situations. (Pull #81)
-
-## 0.8.3 (May 6rd, 2020)
-
-### Fixed
-
-- Include `Host` and `Accept` headers on proxy "CONNECT" requests.
-- De-duplicate any headers also contained in proxy_headers.
-- HTTP/2 flag not being passed down to proxy connections.
-
-## 0.8.2 (May 3rd, 2020)
-
-### Fixed
-
-- Fix connections using proxy forwarding requests not being added to the
-connection pool properly. (Pull #70)
-
-## 0.8.1 (April 30th, 2020)
-
-### Changed
-
-- Allow inherintance of both `httpcore.AsyncByteStream`, `httpcore.SyncByteStream` without type conflicts.
-
-## 0.8.0 (April 30th, 2020)
-
-### Fixed
-
-- Fixed tunnel proxy support.
-
-### Added
-
-- New `TimeoutException` base class.
-
-## 0.7.0 (March 5th, 2020)
-
-- First integration with HTTPX.
diff --git a/contrib/python/httpcore/.dist-info/top_level.txt b/contrib/python/httpcore/.dist-info/top_level.txt
deleted file mode 100644
index 613e43507bb..00000000000
--- a/contrib/python/httpcore/.dist-info/top_level.txt
+++ /dev/null
@@ -1,4 +0,0 @@
-httpcore
-httpcore/_async
-httpcore/_backends
-httpcore/_sync
diff --git a/contrib/python/httpcore/LICENSE.md b/contrib/python/httpcore/LICENSE.md
deleted file mode 100644
index 311b2b56c53..00000000000
--- a/contrib/python/httpcore/LICENSE.md
+++ /dev/null
@@ -1,27 +0,0 @@
-Copyright © 2020, [Encode OSS Ltd](https://www.encode.io/).
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
-* Redistributions of source code must retain the above copyright notice, this
- list of conditions and the following disclaimer.
-
-* Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
-* Neither the name of the copyright holder nor the names of its
- contributors may be used to endorse or promote products derived from
- this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
-FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
-CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/contrib/python/httpcore/README.md b/contrib/python/httpcore/README.md
deleted file mode 100644
index 4567ba44b40..00000000000
--- a/contrib/python/httpcore/README.md
+++ /dev/null
@@ -1,111 +0,0 @@
-# HTTP Core
-
-[![Test Suite](https://github.com/encode/httpcore/workflows/Test%20Suite/badge.svg)](https://github.com/encode/httpcore/actions)
-[![Package version](https://badge.fury.io/py/httpcore.svg)](https://pypi.org/project/httpcore/)
-
-> *Do one thing, and do it well.*
-
-The HTTP Core package provides a minimal low-level HTTP client, which does
-one thing only. Sending HTTP requests.
-
-It does not provide any high level model abstractions over the API,
-does not handle redirects, multipart uploads, building authentication headers,
-transparent HTTP caching, URL parsing, session cookie handling,
-content or charset decoding, handling JSON, environment based configuration
-defaults, or any of that Jazz.
-
-Some things HTTP Core does do:
-
-* Sending HTTP requests.
-* Thread-safe / task-safe connection pooling.
-* HTTP(S) proxy & SOCKS proxy support.
-* Supports HTTP/1.1 and HTTP/2.
-* Provides both sync and async interfaces.
-* Async backend support for `asyncio` and `trio`.
-
-## Requirements
-
-Python 3.8+
-
-## Installation
-
-For HTTP/1.1 only support, install with:
-
-```shell
-$ pip install httpcore
-```
-
-There are also a number of optional extras available...
-
-```shell
-$ pip install httpcore['asyncio,trio,http2,socks']
-```
-
-## Sending requests
-
-Send an HTTP request:
-
-```python
-import httpcore
-
-response = httpcore.request("GET", "https://www.example.com/")
-
-print(response)
-# <Response [200]>
-print(response.status)
-# 200
-print(response.headers)
-# [(b'Accept-Ranges', b'bytes'), (b'Age', b'557328'), (b'Cache-Control', b'max-age=604800'), ...]
-print(response.content)
-# b'<!doctype html>\n<html>\n<head>\n<title>Example Domain</title>\n\n<meta charset="utf-8"/>\n ...'
-```
-
-The top-level `httpcore.request()` function is provided for convenience. In practice whenever you're working with `httpcore` you'll want to use the connection pooling functionality that it provides.
-
-```python
-import httpcore
-
-http = httpcore.ConnectionPool()
-response = http.request("GET", "https://www.example.com/")
-```
-
-Once you're ready to get going, [head over to the documentation](https://www.encode.io/httpcore/).
-
-## Motivation
-
-You *probably* don't want to be using HTTP Core directly. It might make sense if
-you're writing something like a proxy service in Python, and you just want
-something at the lowest possible level, but more typically you'll want to use
-a higher level client library, such as `httpx`.
-
-The motivation for `httpcore` is:
-
-* To provide a reusable low-level client library, that other packages can then build on top of.
-* To provide a *really clear interface split* between the networking code and client logic,
- so that each is easier to understand and reason about in isolation.
-
-## Dependencies
-
-The `httpcore` package has the following dependencies...
-
-* `h11`
-* `certifi`
-
-And the following optional extras...
-
-* `anyio` - Required by `pip install httpcore['asyncio']`.
-* `trio` - Required by `pip install httpcore['trio']`.
-* `h2` - Required by `pip install httpcore['http2']`.
-* `socksio` - Required by `pip install httpcore['socks']`.
-
-## Versioning
-
-We use [SEMVER for our versioning policy](https://semver.org/).
-
-For changes between package versions please see our [project changelog](CHANGELOG.md).
-
-We recommend pinning your requirements either the most current major version, or a more specific version range:
-
-```python
-pip install 'httpcore==1.*'
-```
diff --git a/contrib/python/httpcore/httpcore/__init__.py b/contrib/python/httpcore/httpcore/__init__.py
deleted file mode 100644
index 662b1563a1e..00000000000
--- a/contrib/python/httpcore/httpcore/__init__.py
+++ /dev/null
@@ -1,140 +0,0 @@
-from ._api import request, stream
-from ._async import (
- AsyncConnectionInterface,
- AsyncConnectionPool,
- AsyncHTTP2Connection,
- AsyncHTTP11Connection,
- AsyncHTTPConnection,
- AsyncHTTPProxy,
- AsyncSOCKSProxy,
-)
-from ._backends.base import (
- SOCKET_OPTION,
- AsyncNetworkBackend,
- AsyncNetworkStream,
- NetworkBackend,
- NetworkStream,
-)
-from ._backends.mock import AsyncMockBackend, AsyncMockStream, MockBackend, MockStream
-from ._backends.sync import SyncBackend
-from ._exceptions import (
- ConnectError,
- ConnectionNotAvailable,
- ConnectTimeout,
- LocalProtocolError,
- NetworkError,
- PoolTimeout,
- ProtocolError,
- ProxyError,
- ReadError,
- ReadTimeout,
- RemoteProtocolError,
- TimeoutException,
- UnsupportedProtocol,
- WriteError,
- WriteTimeout,
-)
-from ._models import URL, Origin, Proxy, Request, Response
-from ._ssl import default_ssl_context
-from ._sync import (
- ConnectionInterface,
- ConnectionPool,
- HTTP2Connection,
- HTTP11Connection,
- HTTPConnection,
- HTTPProxy,
- SOCKSProxy,
-)
-
-# The 'httpcore.AnyIOBackend' class is conditional on 'anyio' being installed.
-try:
- from ._backends.anyio import AnyIOBackend
-except ImportError: # pragma: nocover
-
- class AnyIOBackend: # type: ignore
- def __init__(self, *args, **kwargs): # type: ignore
- msg = (
- "Attempted to use 'httpcore.AnyIOBackend' but 'anyio' is not installed."
- )
- raise RuntimeError(msg)
-
-
-# The 'httpcore.TrioBackend' class is conditional on 'trio' being installed.
-try:
- from ._backends.trio import TrioBackend
-except ImportError: # pragma: nocover
-
- class TrioBackend: # type: ignore
- def __init__(self, *args, **kwargs): # type: ignore
- msg = "Attempted to use 'httpcore.TrioBackend' but 'trio' is not installed."
- raise RuntimeError(msg)
-
-
-__all__ = [
- # top-level requests
- "request",
- "stream",
- # models
- "Origin",
- "URL",
- "Request",
- "Response",
- "Proxy",
- # async
- "AsyncHTTPConnection",
- "AsyncConnectionPool",
- "AsyncHTTPProxy",
- "AsyncHTTP11Connection",
- "AsyncHTTP2Connection",
- "AsyncConnectionInterface",
- "AsyncSOCKSProxy",
- # sync
- "HTTPConnection",
- "ConnectionPool",
- "HTTPProxy",
- "HTTP11Connection",
- "HTTP2Connection",
- "ConnectionInterface",
- "SOCKSProxy",
- # network backends, implementations
- "SyncBackend",
- "AnyIOBackend",
- "TrioBackend",
- # network backends, mock implementations
- "AsyncMockBackend",
- "AsyncMockStream",
- "MockBackend",
- "MockStream",
- # network backends, interface
- "AsyncNetworkStream",
- "AsyncNetworkBackend",
- "NetworkStream",
- "NetworkBackend",
- # util
- "default_ssl_context",
- "SOCKET_OPTION",
- # exceptions
- "ConnectionNotAvailable",
- "ProxyError",
- "ProtocolError",
- "LocalProtocolError",
- "RemoteProtocolError",
- "UnsupportedProtocol",
- "TimeoutException",
- "PoolTimeout",
- "ConnectTimeout",
- "ReadTimeout",
- "WriteTimeout",
- "NetworkError",
- "ConnectError",
- "ReadError",
- "WriteError",
-]
-
-__version__ = "1.0.7"
-
-
-__locals = locals()
-for __name in __all__:
- if not __name.startswith("__"):
- setattr(__locals[__name], "__module__", "httpcore") # noqa
diff --git a/contrib/python/httpcore/httpcore/_api.py b/contrib/python/httpcore/httpcore/_api.py
deleted file mode 100644
index 38b961d10de..00000000000
--- a/contrib/python/httpcore/httpcore/_api.py
+++ /dev/null
@@ -1,94 +0,0 @@
-from __future__ import annotations
-
-import contextlib
-import typing
-
-from ._models import URL, Extensions, HeaderTypes, Response
-from ._sync.connection_pool import ConnectionPool
-
-
-def request(
- method: bytes | str,
- url: URL | bytes | str,
- *,
- headers: HeaderTypes = None,
- content: bytes | typing.Iterator[bytes] | None = None,
- extensions: Extensions | None = None,
-) -> Response:
- """
- Sends an HTTP request, returning the response.
-
- ```
- response = httpcore.request("GET", "https://www.example.com/")
- ```
-
- Arguments:
- method: The HTTP method for the request. Typically one of `"GET"`,
- `"OPTIONS"`, `"HEAD"`, `"POST"`, `"PUT"`, `"PATCH"`, or `"DELETE"`.
- url: The URL of the HTTP request. Either as an instance of `httpcore.URL`,
- or as str/bytes.
- headers: The HTTP request headers. Either as a dictionary of str/bytes,
- or as a list of two-tuples of str/bytes.
- content: The content of the request body. Either as bytes,
- or as a bytes iterator.
- extensions: A dictionary of optional extra information included on the request.
- Possible keys include `"timeout"`.
-
- Returns:
- An instance of `httpcore.Response`.
- """
- with ConnectionPool() as pool:
- return pool.request(
- method=method,
- url=url,
- headers=headers,
- content=content,
- extensions=extensions,
- )
-
-
-def stream(
- method: bytes | str,
- url: URL | bytes | str,
- *,
- headers: HeaderTypes = None,
- content: bytes | typing.Iterator[bytes] | None = None,
- extensions: Extensions | None = None,
-) -> typing.Iterator[Response]:
- """
- Sends an HTTP request, returning the response within a content manager.
-
- ```
- with httpcore.stream("GET", "https://www.example.com/") as response:
- ...
- ```
-
- When using the `stream()` function, the body of the response will not be
- automatically read. If you want to access the response body you should
- either use `content = response.read()`, or `for chunk in response.iter_content()`.
-
- Arguments:
- method: The HTTP method for the request. Typically one of `"GET"`,
- `"OPTIONS"`, `"HEAD"`, `"POST"`, `"PUT"`, `"PATCH"`, or `"DELETE"`.
- url: The URL of the HTTP request. Either as an instance of `httpcore.URL`,
- or as str/bytes.
- headers: The HTTP request headers. Either as a dictionary of str/bytes,
- or as a list of two-tuples of str/bytes.
- content: The content of the request body. Either as bytes,
- or as a bytes iterator.
- extensions: A dictionary of optional extra information included on the request.
- Possible keys include `"timeout"`.
-
- Returns:
- An instance of `httpcore.Response`.
- """
- with ConnectionPool() as pool:
- with pool.stream(
- method=method,
- url=url,
- headers=headers,
- content=content,
- extensions=extensions,
- ) as response:
- yield response
diff --git a/contrib/python/httpcore/httpcore/_async/__init__.py b/contrib/python/httpcore/httpcore/_async/__init__.py
deleted file mode 100644
index 88dc7f01e13..00000000000
--- a/contrib/python/httpcore/httpcore/_async/__init__.py
+++ /dev/null
@@ -1,39 +0,0 @@
-from .connection import AsyncHTTPConnection
-from .connection_pool import AsyncConnectionPool
-from .http11 import AsyncHTTP11Connection
-from .http_proxy import AsyncHTTPProxy
-from .interfaces import AsyncConnectionInterface
-
-try:
- from .http2 import AsyncHTTP2Connection
-except ImportError: # pragma: nocover
-
- class AsyncHTTP2Connection: # type: ignore
- def __init__(self, *args, **kwargs) -> None: # type: ignore
- raise RuntimeError(
- "Attempted to use http2 support, but the `h2` package is not "
- "installed. Use 'pip install httpcore[http2]'."
- )
-
-
-try:
- from .socks_proxy import AsyncSOCKSProxy
-except ImportError: # pragma: nocover
-
- class AsyncSOCKSProxy: # type: ignore
- def __init__(self, *args, **kwargs) -> None: # type: ignore
- raise RuntimeError(
- "Attempted to use SOCKS support, but the `socksio` package is not "
- "installed. Use 'pip install httpcore[socks]'."
- )
-
-
-__all__ = [
- "AsyncHTTPConnection",
- "AsyncConnectionPool",
- "AsyncHTTPProxy",
- "AsyncHTTP11Connection",
- "AsyncHTTP2Connection",
- "AsyncConnectionInterface",
- "AsyncSOCKSProxy",
-]
diff --git a/contrib/python/httpcore/httpcore/_async/connection.py b/contrib/python/httpcore/httpcore/_async/connection.py
deleted file mode 100644
index b42581dff8a..00000000000
--- a/contrib/python/httpcore/httpcore/_async/connection.py
+++ /dev/null
@@ -1,222 +0,0 @@
-from __future__ import annotations
-
-import itertools
-import logging
-import ssl
-import types
-import typing
-
-from .._backends.auto import AutoBackend
-from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream
-from .._exceptions import ConnectError, ConnectTimeout
-from .._models import Origin, Request, Response
-from .._ssl import default_ssl_context
-from .._synchronization import AsyncLock
-from .._trace import Trace
-from .http11 import AsyncHTTP11Connection
-from .interfaces import AsyncConnectionInterface
-
-RETRIES_BACKOFF_FACTOR = 0.5 # 0s, 0.5s, 1s, 2s, 4s, etc.
-
-
-logger = logging.getLogger("httpcore.connection")
-
-
-def exponential_backoff(factor: float) -> typing.Iterator[float]:
- """
- Generate a geometric sequence that has a ratio of 2 and starts with 0.
-
- For example:
- - `factor = 2`: `0, 2, 4, 8, 16, 32, 64, ...`
- - `factor = 3`: `0, 3, 6, 12, 24, 48, 96, ...`
- """
- yield 0
- for n in itertools.count():
- yield factor * 2**n
-
-
-class AsyncHTTPConnection(AsyncConnectionInterface):
- def __init__(
- self,
- origin: Origin,
- ssl_context: ssl.SSLContext | None = None,
- keepalive_expiry: float | None = None,
- http1: bool = True,
- http2: bool = False,
- retries: int = 0,
- local_address: str | None = None,
- uds: str | None = None,
- network_backend: AsyncNetworkBackend | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> None:
- self._origin = origin
- self._ssl_context = ssl_context
- self._keepalive_expiry = keepalive_expiry
- self._http1 = http1
- self._http2 = http2
- self._retries = retries
- self._local_address = local_address
- self._uds = uds
-
- self._network_backend: AsyncNetworkBackend = (
- AutoBackend() if network_backend is None else network_backend
- )
- self._connection: AsyncConnectionInterface | None = None
- self._connect_failed: bool = False
- self._request_lock = AsyncLock()
- self._socket_options = socket_options
-
- async def handle_async_request(self, request: Request) -> Response:
- if not self.can_handle_request(request.url.origin):
- raise RuntimeError(
- f"Attempted to send request to {request.url.origin} on connection to {self._origin}"
- )
-
- try:
- async with self._request_lock:
- if self._connection is None:
- stream = await self._connect(request)
-
- ssl_object = stream.get_extra_info("ssl_object")
- http2_negotiated = (
- ssl_object is not None
- and ssl_object.selected_alpn_protocol() == "h2"
- )
- if http2_negotiated or (self._http2 and not self._http1):
- from .http2 import AsyncHTTP2Connection
-
- self._connection = AsyncHTTP2Connection(
- origin=self._origin,
- stream=stream,
- keepalive_expiry=self._keepalive_expiry,
- )
- else:
- self._connection = AsyncHTTP11Connection(
- origin=self._origin,
- stream=stream,
- keepalive_expiry=self._keepalive_expiry,
- )
- except BaseException as exc:
- self._connect_failed = True
- raise exc
-
- return await self._connection.handle_async_request(request)
-
- async def _connect(self, request: Request) -> AsyncNetworkStream:
- timeouts = request.extensions.get("timeout", {})
- sni_hostname = request.extensions.get("sni_hostname", None)
- timeout = timeouts.get("connect", None)
-
- retries_left = self._retries
- delays = exponential_backoff(factor=RETRIES_BACKOFF_FACTOR)
-
- while True:
- try:
- if self._uds is None:
- kwargs = {
- "host": self._origin.host.decode("ascii"),
- "port": self._origin.port,
- "local_address": self._local_address,
- "timeout": timeout,
- "socket_options": self._socket_options,
- }
- async with Trace("connect_tcp", logger, request, kwargs) as trace:
- stream = await self._network_backend.connect_tcp(**kwargs)
- trace.return_value = stream
- else:
- kwargs = {
- "path": self._uds,
- "timeout": timeout,
- "socket_options": self._socket_options,
- }
- async with Trace(
- "connect_unix_socket", logger, request, kwargs
- ) as trace:
- stream = await self._network_backend.connect_unix_socket(
- **kwargs
- )
- trace.return_value = stream
-
- if self._origin.scheme in (b"https", b"wss"):
- ssl_context = (
- default_ssl_context()
- if self._ssl_context is None
- else self._ssl_context
- )
- alpn_protocols = ["http/1.1", "h2"] if self._http2 else ["http/1.1"]
- ssl_context.set_alpn_protocols(alpn_protocols)
-
- kwargs = {
- "ssl_context": ssl_context,
- "server_hostname": sni_hostname
- or self._origin.host.decode("ascii"),
- "timeout": timeout,
- }
- async with Trace("start_tls", logger, request, kwargs) as trace:
- stream = await stream.start_tls(**kwargs)
- trace.return_value = stream
- return stream
- except (ConnectError, ConnectTimeout):
- if retries_left <= 0:
- raise
- retries_left -= 1
- delay = next(delays)
- async with Trace("retry", logger, request, kwargs) as trace:
- await self._network_backend.sleep(delay)
-
- def can_handle_request(self, origin: Origin) -> bool:
- return origin == self._origin
-
- async def aclose(self) -> None:
- if self._connection is not None:
- async with Trace("close", logger, None, {}):
- await self._connection.aclose()
-
- def is_available(self) -> bool:
- if self._connection is None:
- # If HTTP/2 support is enabled, and the resulting connection could
- # end up as HTTP/2 then we should indicate the connection as being
- # available to service multiple requests.
- return (
- self._http2
- and (self._origin.scheme == b"https" or not self._http1)
- and not self._connect_failed
- )
- return self._connection.is_available()
-
- def has_expired(self) -> bool:
- if self._connection is None:
- return self._connect_failed
- return self._connection.has_expired()
-
- def is_idle(self) -> bool:
- if self._connection is None:
- return self._connect_failed
- return self._connection.is_idle()
-
- def is_closed(self) -> bool:
- if self._connection is None:
- return self._connect_failed
- return self._connection.is_closed()
-
- def info(self) -> str:
- if self._connection is None:
- return "CONNECTION FAILED" if self._connect_failed else "CONNECTING"
- return self._connection.info()
-
- def __repr__(self) -> str:
- return f"<{self.__class__.__name__} [{self.info()}]>"
-
- # These context managers are not used in the standard flow, but are
- # useful for testing or working with connection instances directly.
-
- async def __aenter__(self) -> AsyncHTTPConnection:
- return self
-
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- await self.aclose()
diff --git a/contrib/python/httpcore/httpcore/_async/connection_pool.py b/contrib/python/httpcore/httpcore/_async/connection_pool.py
deleted file mode 100644
index 96e973d0ce2..00000000000
--- a/contrib/python/httpcore/httpcore/_async/connection_pool.py
+++ /dev/null
@@ -1,420 +0,0 @@
-from __future__ import annotations
-
-import ssl
-import sys
-import types
-import typing
-
-from .._backends.auto import AutoBackend
-from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
-from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
-from .._models import Origin, Proxy, Request, Response
-from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock
-from .connection import AsyncHTTPConnection
-from .interfaces import AsyncConnectionInterface, AsyncRequestInterface
-
-
-class AsyncPoolRequest:
- def __init__(self, request: Request) -> None:
- self.request = request
- self.connection: AsyncConnectionInterface | None = None
- self._connection_acquired = AsyncEvent()
-
- def assign_to_connection(self, connection: AsyncConnectionInterface | None) -> None:
- self.connection = connection
- self._connection_acquired.set()
-
- def clear_connection(self) -> None:
- self.connection = None
- self._connection_acquired = AsyncEvent()
-
- async def wait_for_connection(
- self, timeout: float | None = None
- ) -> AsyncConnectionInterface:
- if self.connection is None:
- await self._connection_acquired.wait(timeout=timeout)
- assert self.connection is not None
- return self.connection
-
- def is_queued(self) -> bool:
- return self.connection is None
-
-
-class AsyncConnectionPool(AsyncRequestInterface):
- """
- A connection pool for making HTTP requests.
- """
-
- def __init__(
- self,
- ssl_context: ssl.SSLContext | None = None,
- proxy: Proxy | None = None,
- max_connections: int | None = 10,
- max_keepalive_connections: int | None = None,
- keepalive_expiry: float | None = None,
- http1: bool = True,
- http2: bool = False,
- retries: int = 0,
- local_address: str | None = None,
- uds: str | None = None,
- network_backend: AsyncNetworkBackend | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> None:
- """
- A connection pool for making HTTP requests.
-
- Parameters:
- ssl_context: An SSL context to use for verifying connections.
- If not specified, the default `httpcore.default_ssl_context()`
- will be used.
- max_connections: The maximum number of concurrent HTTP connections that
- the pool should allow. Any attempt to send a request on a pool that
- would exceed this amount will block until a connection is available.
- max_keepalive_connections: The maximum number of idle HTTP connections
- that will be maintained in the pool.
- keepalive_expiry: The duration in seconds that an idle HTTP connection
- may be maintained for before being expired from the pool.
- http1: A boolean indicating if HTTP/1.1 requests should be supported
- by the connection pool. Defaults to True.
- http2: A boolean indicating if HTTP/2 requests should be supported by
- the connection pool. Defaults to False.
- retries: The maximum number of retries when trying to establish a
- connection.
- local_address: Local address to connect from. Can also be used to connect
- using a particular address family. Using `local_address="0.0.0.0"`
- will connect using an `AF_INET` address (IPv4), while using
- `local_address="::"` will connect using an `AF_INET6` address (IPv6).
- uds: Path to a Unix Domain Socket to use instead of TCP sockets.
- network_backend: A backend instance to use for handling network I/O.
- socket_options: Socket options that have to be included
- in the TCP socket when the connection was established.
- """
- self._ssl_context = ssl_context
- self._proxy = proxy
- self._max_connections = (
- sys.maxsize if max_connections is None else max_connections
- )
- self._max_keepalive_connections = (
- sys.maxsize
- if max_keepalive_connections is None
- else max_keepalive_connections
- )
- self._max_keepalive_connections = min(
- self._max_connections, self._max_keepalive_connections
- )
-
- self._keepalive_expiry = keepalive_expiry
- self._http1 = http1
- self._http2 = http2
- self._retries = retries
- self._local_address = local_address
- self._uds = uds
-
- self._network_backend = (
- AutoBackend() if network_backend is None else network_backend
- )
- self._socket_options = socket_options
-
- # The mutable state on a connection pool is the queue of incoming requests,
- # and the set of connections that are servicing those requests.
- self._connections: list[AsyncConnectionInterface] = []
- self._requests: list[AsyncPoolRequest] = []
-
- # We only mutate the state of the connection pool within an 'optional_thread_lock'
- # context. This holds a threading lock unless we're running in async mode,
- # in which case it is a no-op.
- self._optional_thread_lock = AsyncThreadLock()
-
- def create_connection(self, origin: Origin) -> AsyncConnectionInterface:
- if self._proxy is not None:
- if self._proxy.url.scheme in (b"socks5", b"socks5h"):
- from .socks_proxy import AsyncSocks5Connection
-
- return AsyncSocks5Connection(
- proxy_origin=self._proxy.url.origin,
- proxy_auth=self._proxy.auth,
- remote_origin=origin,
- ssl_context=self._ssl_context,
- keepalive_expiry=self._keepalive_expiry,
- http1=self._http1,
- http2=self._http2,
- network_backend=self._network_backend,
- )
- elif origin.scheme == b"http":
- from .http_proxy import AsyncForwardHTTPConnection
-
- return AsyncForwardHTTPConnection(
- proxy_origin=self._proxy.url.origin,
- proxy_headers=self._proxy.headers,
- proxy_ssl_context=self._proxy.ssl_context,
- remote_origin=origin,
- keepalive_expiry=self._keepalive_expiry,
- network_backend=self._network_backend,
- )
- from .http_proxy import AsyncTunnelHTTPConnection
-
- return AsyncTunnelHTTPConnection(
- proxy_origin=self._proxy.url.origin,
- proxy_headers=self._proxy.headers,
- proxy_ssl_context=self._proxy.ssl_context,
- remote_origin=origin,
- ssl_context=self._ssl_context,
- keepalive_expiry=self._keepalive_expiry,
- http1=self._http1,
- http2=self._http2,
- network_backend=self._network_backend,
- )
-
- return AsyncHTTPConnection(
- origin=origin,
- ssl_context=self._ssl_context,
- keepalive_expiry=self._keepalive_expiry,
- http1=self._http1,
- http2=self._http2,
- retries=self._retries,
- local_address=self._local_address,
- uds=self._uds,
- network_backend=self._network_backend,
- socket_options=self._socket_options,
- )
-
- @property
- def connections(self) -> list[AsyncConnectionInterface]:
- """
- Return a list of the connections currently in the pool.
-
- For example:
-
- ```python
- >>> pool.connections
- [
- <AsyncHTTPConnection ['https://example.com:443', HTTP/1.1, ACTIVE, Request Count: 6]>,
- <AsyncHTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 9]> ,
- <AsyncHTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>,
- ]
- ```
- """
- return list(self._connections)
-
- async def handle_async_request(self, request: Request) -> Response:
- """
- Send an HTTP request, and return an HTTP response.
-
- This is the core implementation that is called into by `.request()` or `.stream()`.
- """
- scheme = request.url.scheme.decode()
- if scheme == "":
- raise UnsupportedProtocol(
- "Request URL is missing an 'http://' or 'https://' protocol."
- )
- if scheme not in ("http", "https", "ws", "wss"):
- raise UnsupportedProtocol(
- f"Request URL has an unsupported protocol '{scheme}://'."
- )
-
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("pool", None)
-
- with self._optional_thread_lock:
- # Add the incoming request to our request queue.
- pool_request = AsyncPoolRequest(request)
- self._requests.append(pool_request)
-
- try:
- while True:
- with self._optional_thread_lock:
- # Assign incoming requests to available connections,
- # closing or creating new connections as required.
- closing = self._assign_requests_to_connections()
- await self._close_connections(closing)
-
- # Wait until this request has an assigned connection.
- connection = await pool_request.wait_for_connection(timeout=timeout)
-
- try:
- # Send the request on the assigned connection.
- response = await connection.handle_async_request(
- pool_request.request
- )
- except ConnectionNotAvailable:
- # In some cases a connection may initially be available to
- # handle a request, but then become unavailable.
- #
- # In this case we clear the connection and try again.
- pool_request.clear_connection()
- else:
- break # pragma: nocover
-
- except BaseException as exc:
- with self._optional_thread_lock:
- # For any exception or cancellation we remove the request from
- # the queue, and then re-assign requests to connections.
- self._requests.remove(pool_request)
- closing = self._assign_requests_to_connections()
-
- await self._close_connections(closing)
- raise exc from None
-
- # Return the response. Note that in this case we still have to manage
- # the point at which the response is closed.
- assert isinstance(response.stream, typing.AsyncIterable)
- return Response(
- status=response.status,
- headers=response.headers,
- content=PoolByteStream(
- stream=response.stream, pool_request=pool_request, pool=self
- ),
- extensions=response.extensions,
- )
-
- def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]:
- """
- Manage the state of the connection pool, assigning incoming
- requests to connections as available.
-
- Called whenever a new request is added or removed from the pool.
-
- Any closing connections are returned, allowing the I/O for closing
- those connections to be handled seperately.
- """
- closing_connections = []
-
- # First we handle cleaning up any connections that are closed,
- # have expired their keep-alive, or surplus idle connections.
- for connection in list(self._connections):
- if connection.is_closed():
- # log: "removing closed connection"
- self._connections.remove(connection)
- elif connection.has_expired():
- # log: "closing expired connection"
- self._connections.remove(connection)
- closing_connections.append(connection)
- elif (
- connection.is_idle()
- and len([connection.is_idle() for connection in self._connections])
- > self._max_keepalive_connections
- ):
- # log: "closing idle connection"
- self._connections.remove(connection)
- closing_connections.append(connection)
-
- # Assign queued requests to connections.
- queued_requests = [request for request in self._requests if request.is_queued()]
- for pool_request in queued_requests:
- origin = pool_request.request.url.origin
- available_connections = [
- connection
- for connection in self._connections
- if connection.can_handle_request(origin) and connection.is_available()
- ]
- idle_connections = [
- connection for connection in self._connections if connection.is_idle()
- ]
-
- # There are three cases for how we may be able to handle the request:
- #
- # 1. There is an existing connection that can handle the request.
- # 2. We can create a new connection to handle the request.
- # 3. We can close an idle connection and then create a new connection
- # to handle the request.
- if available_connections:
- # log: "reusing existing connection"
- connection = available_connections[0]
- pool_request.assign_to_connection(connection)
- elif len(self._connections) < self._max_connections:
- # log: "creating new connection"
- connection = self.create_connection(origin)
- self._connections.append(connection)
- pool_request.assign_to_connection(connection)
- elif idle_connections:
- # log: "closing idle connection"
- connection = idle_connections[0]
- self._connections.remove(connection)
- closing_connections.append(connection)
- # log: "creating new connection"
- connection = self.create_connection(origin)
- self._connections.append(connection)
- pool_request.assign_to_connection(connection)
-
- return closing_connections
-
- async def _close_connections(self, closing: list[AsyncConnectionInterface]) -> None:
- # Close connections which have been removed from the pool.
- with AsyncShieldCancellation():
- for connection in closing:
- await connection.aclose()
-
- async def aclose(self) -> None:
- # Explicitly close the connection pool.
- # Clears all existing requests and connections.
- with self._optional_thread_lock:
- closing_connections = list(self._connections)
- self._connections = []
- await self._close_connections(closing_connections)
-
- async def __aenter__(self) -> AsyncConnectionPool:
- return self
-
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- await self.aclose()
-
- def __repr__(self) -> str:
- class_name = self.__class__.__name__
- with self._optional_thread_lock:
- request_is_queued = [request.is_queued() for request in self._requests]
- connection_is_idle = [
- connection.is_idle() for connection in self._connections
- ]
-
- num_active_requests = request_is_queued.count(False)
- num_queued_requests = request_is_queued.count(True)
- num_active_connections = connection_is_idle.count(False)
- num_idle_connections = connection_is_idle.count(True)
-
- requests_info = (
- f"Requests: {num_active_requests} active, {num_queued_requests} queued"
- )
- connection_info = (
- f"Connections: {num_active_connections} active, {num_idle_connections} idle"
- )
-
- return f"<{class_name} [{requests_info} | {connection_info}]>"
-
-
-class PoolByteStream:
- def __init__(
- self,
- stream: typing.AsyncIterable[bytes],
- pool_request: AsyncPoolRequest,
- pool: AsyncConnectionPool,
- ) -> None:
- self._stream = stream
- self._pool_request = pool_request
- self._pool = pool
- self._closed = False
-
- async def __aiter__(self) -> typing.AsyncIterator[bytes]:
- try:
- async for part in self._stream:
- yield part
- except BaseException as exc:
- await self.aclose()
- raise exc from None
-
- async def aclose(self) -> None:
- if not self._closed:
- self._closed = True
- with AsyncShieldCancellation():
- if hasattr(self._stream, "aclose"):
- await self._stream.aclose()
-
- with self._pool._optional_thread_lock:
- self._pool._requests.remove(self._pool_request)
- closing = self._pool._assign_requests_to_connections()
-
- await self._pool._close_connections(closing)
diff --git a/contrib/python/httpcore/httpcore/_async/http11.py b/contrib/python/httpcore/httpcore/_async/http11.py
deleted file mode 100644
index e6d6d709852..00000000000
--- a/contrib/python/httpcore/httpcore/_async/http11.py
+++ /dev/null
@@ -1,379 +0,0 @@
-from __future__ import annotations
-
-import enum
-import logging
-import ssl
-import time
-import types
-import typing
-
-import h11
-
-from .._backends.base import AsyncNetworkStream
-from .._exceptions import (
- ConnectionNotAvailable,
- LocalProtocolError,
- RemoteProtocolError,
- WriteError,
- map_exceptions,
-)
-from .._models import Origin, Request, Response
-from .._synchronization import AsyncLock, AsyncShieldCancellation
-from .._trace import Trace
-from .interfaces import AsyncConnectionInterface
-
-logger = logging.getLogger("httpcore.http11")
-
-
-# A subset of `h11.Event` types supported by `_send_event`
-H11SendEvent = typing.Union[
- h11.Request,
- h11.Data,
- h11.EndOfMessage,
-]
-
-
-class HTTPConnectionState(enum.IntEnum):
- NEW = 0
- ACTIVE = 1
- IDLE = 2
- CLOSED = 3
-
-
-class AsyncHTTP11Connection(AsyncConnectionInterface):
- READ_NUM_BYTES = 64 * 1024
- MAX_INCOMPLETE_EVENT_SIZE = 100 * 1024
-
- def __init__(
- self,
- origin: Origin,
- stream: AsyncNetworkStream,
- keepalive_expiry: float | None = None,
- ) -> None:
- self._origin = origin
- self._network_stream = stream
- self._keepalive_expiry: float | None = keepalive_expiry
- self._expire_at: float | None = None
- self._state = HTTPConnectionState.NEW
- self._state_lock = AsyncLock()
- self._request_count = 0
- self._h11_state = h11.Connection(
- our_role=h11.CLIENT,
- max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
- )
-
- async def handle_async_request(self, request: Request) -> Response:
- if not self.can_handle_request(request.url.origin):
- raise RuntimeError(
- f"Attempted to send request to {request.url.origin} on connection "
- f"to {self._origin}"
- )
-
- async with self._state_lock:
- if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
- self._request_count += 1
- self._state = HTTPConnectionState.ACTIVE
- self._expire_at = None
- else:
- raise ConnectionNotAvailable()
-
- try:
- kwargs = {"request": request}
- try:
- async with Trace(
- "send_request_headers", logger, request, kwargs
- ) as trace:
- await self._send_request_headers(**kwargs)
- async with Trace("send_request_body", logger, request, kwargs) as trace:
- await self._send_request_body(**kwargs)
- except WriteError:
- # If we get a write error while we're writing the request,
- # then we supress this error and move on to attempting to
- # read the response. Servers can sometimes close the request
- # pre-emptively and then respond with a well formed HTTP
- # error response.
- pass
-
- async with Trace(
- "receive_response_headers", logger, request, kwargs
- ) as trace:
- (
- http_version,
- status,
- reason_phrase,
- headers,
- trailing_data,
- ) = await self._receive_response_headers(**kwargs)
- trace.return_value = (
- http_version,
- status,
- reason_phrase,
- headers,
- )
-
- network_stream = self._network_stream
-
- # CONNECT or Upgrade request
- if (status == 101) or (
- (request.method == b"CONNECT") and (200 <= status < 300)
- ):
- network_stream = AsyncHTTP11UpgradeStream(network_stream, trailing_data)
-
- return Response(
- status=status,
- headers=headers,
- content=HTTP11ConnectionByteStream(self, request),
- extensions={
- "http_version": http_version,
- "reason_phrase": reason_phrase,
- "network_stream": network_stream,
- },
- )
- except BaseException as exc:
- with AsyncShieldCancellation():
- async with Trace("response_closed", logger, request) as trace:
- await self._response_closed()
- raise exc
-
- # Sending the request...
-
- async def _send_request_headers(self, request: Request) -> None:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("write", None)
-
- with map_exceptions({h11.LocalProtocolError: LocalProtocolError}):
- event = h11.Request(
- method=request.method,
- target=request.url.target,
- headers=request.headers,
- )
- await self._send_event(event, timeout=timeout)
-
- async def _send_request_body(self, request: Request) -> None:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("write", None)
-
- assert isinstance(request.stream, typing.AsyncIterable)
- async for chunk in request.stream:
- event = h11.Data(data=chunk)
- await self._send_event(event, timeout=timeout)
-
- await self._send_event(h11.EndOfMessage(), timeout=timeout)
-
- async def _send_event(self, event: h11.Event, timeout: float | None = None) -> None:
- bytes_to_send = self._h11_state.send(event)
- if bytes_to_send is not None:
- await self._network_stream.write(bytes_to_send, timeout=timeout)
-
- # Receiving the response...
-
- async def _receive_response_headers(
- self, request: Request
- ) -> tuple[bytes, int, bytes, list[tuple[bytes, bytes]], bytes]:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("read", None)
-
- while True:
- event = await self._receive_event(timeout=timeout)
- if isinstance(event, h11.Response):
- break
- if (
- isinstance(event, h11.InformationalResponse)
- and event.status_code == 101
- ):
- break
-
- http_version = b"HTTP/" + event.http_version
-
- # h11 version 0.11+ supports a `raw_items` interface to get the
- # raw header casing, rather than the enforced lowercase headers.
- headers = event.headers.raw_items()
-
- trailing_data, _ = self._h11_state.trailing_data
-
- return http_version, event.status_code, event.reason, headers, trailing_data
-
- async def _receive_response_body(
- self, request: Request
- ) -> typing.AsyncIterator[bytes]:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("read", None)
-
- while True:
- event = await self._receive_event(timeout=timeout)
- if isinstance(event, h11.Data):
- yield bytes(event.data)
- elif isinstance(event, (h11.EndOfMessage, h11.PAUSED)):
- break
-
- async def _receive_event(
- self, timeout: float | None = None
- ) -> h11.Event | type[h11.PAUSED]:
- while True:
- with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
- event = self._h11_state.next_event()
-
- if event is h11.NEED_DATA:
- data = await self._network_stream.read(
- self.READ_NUM_BYTES, timeout=timeout
- )
-
- # If we feed this case through h11 we'll raise an exception like:
- #
- # httpcore.RemoteProtocolError: can't handle event type
- # ConnectionClosed when role=SERVER and state=SEND_RESPONSE
- #
- # Which is accurate, but not very informative from an end-user
- # perspective. Instead we handle this case distinctly and treat
- # it as a ConnectError.
- if data == b"" and self._h11_state.their_state == h11.SEND_RESPONSE:
- msg = "Server disconnected without sending a response."
- raise RemoteProtocolError(msg)
-
- self._h11_state.receive_data(data)
- else:
- # mypy fails to narrow the type in the above if statement above
- return event # type: ignore[return-value]
-
- async def _response_closed(self) -> None:
- async with self._state_lock:
- if (
- self._h11_state.our_state is h11.DONE
- and self._h11_state.their_state is h11.DONE
- ):
- self._state = HTTPConnectionState.IDLE
- self._h11_state.start_next_cycle()
- if self._keepalive_expiry is not None:
- now = time.monotonic()
- self._expire_at = now + self._keepalive_expiry
- else:
- await self.aclose()
-
- # Once the connection is no longer required...
-
- async def aclose(self) -> None:
- # Note that this method unilaterally closes the connection, and does
- # not have any kind of locking in place around it.
- self._state = HTTPConnectionState.CLOSED
- await self._network_stream.aclose()
-
- # The AsyncConnectionInterface methods provide information about the state of
- # the connection, allowing for a connection pooling implementation to
- # determine when to reuse and when to close the connection...
-
- def can_handle_request(self, origin: Origin) -> bool:
- return origin == self._origin
-
- def is_available(self) -> bool:
- # Note that HTTP/1.1 connections in the "NEW" state are not treated as
- # being "available". The control flow which created the connection will
- # be able to send an outgoing request, but the connection will not be
- # acquired from the connection pool for any other request.
- return self._state == HTTPConnectionState.IDLE
-
- def has_expired(self) -> bool:
- now = time.monotonic()
- keepalive_expired = self._expire_at is not None and now > self._expire_at
-
- # If the HTTP connection is idle but the socket is readable, then the
- # only valid state is that the socket is about to return b"", indicating
- # a server-initiated disconnect.
- server_disconnected = (
- self._state == HTTPConnectionState.IDLE
- and self._network_stream.get_extra_info("is_readable")
- )
-
- return keepalive_expired or server_disconnected
-
- def is_idle(self) -> bool:
- return self._state == HTTPConnectionState.IDLE
-
- def is_closed(self) -> bool:
- return self._state == HTTPConnectionState.CLOSED
-
- def info(self) -> str:
- origin = str(self._origin)
- return (
- f"{origin!r}, HTTP/1.1, {self._state.name}, "
- f"Request Count: {self._request_count}"
- )
-
- def __repr__(self) -> str:
- class_name = self.__class__.__name__
- origin = str(self._origin)
- return (
- f"<{class_name} [{origin!r}, {self._state.name}, "
- f"Request Count: {self._request_count}]>"
- )
-
- # These context managers are not used in the standard flow, but are
- # useful for testing or working with connection instances directly.
-
- async def __aenter__(self) -> AsyncHTTP11Connection:
- return self
-
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- await self.aclose()
-
-
-class HTTP11ConnectionByteStream:
- def __init__(self, connection: AsyncHTTP11Connection, request: Request) -> None:
- self._connection = connection
- self._request = request
- self._closed = False
-
- async def __aiter__(self) -> typing.AsyncIterator[bytes]:
- kwargs = {"request": self._request}
- try:
- async with Trace("receive_response_body", logger, self._request, kwargs):
- async for chunk in self._connection._receive_response_body(**kwargs):
- yield chunk
- except BaseException as exc:
- # If we get an exception while streaming the response,
- # we want to close the response (and possibly the connection)
- # before raising that exception.
- with AsyncShieldCancellation():
- await self.aclose()
- raise exc
-
- async def aclose(self) -> None:
- if not self._closed:
- self._closed = True
- async with Trace("response_closed", logger, self._request):
- await self._connection._response_closed()
-
-
-class AsyncHTTP11UpgradeStream(AsyncNetworkStream):
- def __init__(self, stream: AsyncNetworkStream, leading_data: bytes) -> None:
- self._stream = stream
- self._leading_data = leading_data
-
- async def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
- if self._leading_data:
- buffer = self._leading_data[:max_bytes]
- self._leading_data = self._leading_data[max_bytes:]
- return buffer
- else:
- return await self._stream.read(max_bytes, timeout)
-
- async def write(self, buffer: bytes, timeout: float | None = None) -> None:
- await self._stream.write(buffer, timeout)
-
- async def aclose(self) -> None:
- await self._stream.aclose()
-
- async def start_tls(
- self,
- ssl_context: ssl.SSLContext,
- server_hostname: str | None = None,
- timeout: float | None = None,
- ) -> AsyncNetworkStream:
- return await self._stream.start_tls(ssl_context, server_hostname, timeout)
-
- def get_extra_info(self, info: str) -> typing.Any:
- return self._stream.get_extra_info(info)
diff --git a/contrib/python/httpcore/httpcore/_async/http2.py b/contrib/python/httpcore/httpcore/_async/http2.py
deleted file mode 100644
index c6434a04969..00000000000
--- a/contrib/python/httpcore/httpcore/_async/http2.py
+++ /dev/null
@@ -1,583 +0,0 @@
-from __future__ import annotations
-
-import enum
-import logging
-import time
-import types
-import typing
-
-import h2.config
-import h2.connection
-import h2.events
-import h2.exceptions
-import h2.settings
-
-from .._backends.base import AsyncNetworkStream
-from .._exceptions import (
- ConnectionNotAvailable,
- LocalProtocolError,
- RemoteProtocolError,
-)
-from .._models import Origin, Request, Response
-from .._synchronization import AsyncLock, AsyncSemaphore, AsyncShieldCancellation
-from .._trace import Trace
-from .interfaces import AsyncConnectionInterface
-
-logger = logging.getLogger("httpcore.http2")
-
-
-def has_body_headers(request: Request) -> bool:
- return any(
- k.lower() == b"content-length" or k.lower() == b"transfer-encoding"
- for k, v in request.headers
- )
-
-
-class HTTPConnectionState(enum.IntEnum):
- ACTIVE = 1
- IDLE = 2
- CLOSED = 3
-
-
-class AsyncHTTP2Connection(AsyncConnectionInterface):
- READ_NUM_BYTES = 64 * 1024
- CONFIG = h2.config.H2Configuration(validate_inbound_headers=False)
-
- def __init__(
- self,
- origin: Origin,
- stream: AsyncNetworkStream,
- keepalive_expiry: float | None = None,
- ):
- self._origin = origin
- self._network_stream = stream
- self._keepalive_expiry: float | None = keepalive_expiry
- self._h2_state = h2.connection.H2Connection(config=self.CONFIG)
- self._state = HTTPConnectionState.IDLE
- self._expire_at: float | None = None
- self._request_count = 0
- self._init_lock = AsyncLock()
- self._state_lock = AsyncLock()
- self._read_lock = AsyncLock()
- self._write_lock = AsyncLock()
- self._sent_connection_init = False
- self._used_all_stream_ids = False
- self._connection_error = False
-
- # Mapping from stream ID to response stream events.
- self._events: dict[
- int,
- h2.events.ResponseReceived
- | h2.events.DataReceived
- | h2.events.StreamEnded
- | h2.events.StreamReset,
- ] = {}
-
- # Connection terminated events are stored as state since
- # we need to handle them for all streams.
- self._connection_terminated: h2.events.ConnectionTerminated | None = None
-
- self._read_exception: Exception | None = None
- self._write_exception: Exception | None = None
-
- async def handle_async_request(self, request: Request) -> Response:
- if not self.can_handle_request(request.url.origin):
- # This cannot occur in normal operation, since the connection pool
- # will only send requests on connections that handle them.
- # It's in place simply for resilience as a guard against incorrect
- # usage, for anyone working directly with httpcore connections.
- raise RuntimeError(
- f"Attempted to send request to {request.url.origin} on connection "
- f"to {self._origin}"
- )
-
- async with self._state_lock:
- if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE):
- self._request_count += 1
- self._expire_at = None
- self._state = HTTPConnectionState.ACTIVE
- else:
- raise ConnectionNotAvailable()
-
- async with self._init_lock:
- if not self._sent_connection_init:
- try:
- kwargs = {"request": request}
- async with Trace("send_connection_init", logger, request, kwargs):
- await self._send_connection_init(**kwargs)
- except BaseException as exc:
- with AsyncShieldCancellation():
- await self.aclose()
- raise exc
-
- self._sent_connection_init = True
-
- # Initially start with just 1 until the remote server provides
- # its max_concurrent_streams value
- self._max_streams = 1
-
- local_settings_max_streams = (
- self._h2_state.local_settings.max_concurrent_streams
- )
- self._max_streams_semaphore = AsyncSemaphore(local_settings_max_streams)
-
- for _ in range(local_settings_max_streams - self._max_streams):
- await self._max_streams_semaphore.acquire()
-
- await self._max_streams_semaphore.acquire()
-
- try:
- stream_id = self._h2_state.get_next_available_stream_id()
- self._events[stream_id] = []
- except h2.exceptions.NoAvailableStreamIDError: # pragma: nocover
- self._used_all_stream_ids = True
- self._request_count -= 1
- raise ConnectionNotAvailable()
-
- try:
- kwargs = {"request": request, "stream_id": stream_id}
- async with Trace("send_request_headers", logger, request, kwargs):
- await self._send_request_headers(request=request, stream_id=stream_id)
- async with Trace("send_request_body", logger, request, kwargs):
- await self._send_request_body(request=request, stream_id=stream_id)
- async with Trace(
- "receive_response_headers", logger, request, kwargs
- ) as trace:
- status, headers = await self._receive_response(
- request=request, stream_id=stream_id
- )
- trace.return_value = (status, headers)
-
- return Response(
- status=status,
- headers=headers,
- content=HTTP2ConnectionByteStream(self, request, stream_id=stream_id),
- extensions={
- "http_version": b"HTTP/2",
- "network_stream": self._network_stream,
- "stream_id": stream_id,
- },
- )
- except BaseException as exc: # noqa: PIE786
- with AsyncShieldCancellation():
- kwargs = {"stream_id": stream_id}
- async with Trace("response_closed", logger, request, kwargs):
- await self._response_closed(stream_id=stream_id)
-
- if isinstance(exc, h2.exceptions.ProtocolError):
- # One case where h2 can raise a protocol error is when a
- # closed frame has been seen by the state machine.
- #
- # This happens when one stream is reading, and encounters
- # a GOAWAY event. Other flows of control may then raise
- # a protocol error at any point they interact with the 'h2_state'.
- #
- # In this case we'll have stored the event, and should raise
- # it as a RemoteProtocolError.
- if self._connection_terminated: # pragma: nocover
- raise RemoteProtocolError(self._connection_terminated)
- # If h2 raises a protocol error in some other state then we
- # must somehow have made a protocol violation.
- raise LocalProtocolError(exc) # pragma: nocover
-
- raise exc
-
- async def _send_connection_init(self, request: Request) -> None:
- """
- The HTTP/2 connection requires some initial setup before we can start
- using individual request/response streams on it.
- """
- # Need to set these manually here instead of manipulating via
- # __setitem__() otherwise the H2Connection will emit SettingsUpdate
- # frames in addition to sending the undesired defaults.
- self._h2_state.local_settings = h2.settings.Settings(
- client=True,
- initial_values={
- # Disable PUSH_PROMISE frames from the server since we don't do anything
- # with them for now. Maybe when we support caching?
- h2.settings.SettingCodes.ENABLE_PUSH: 0,
- # These two are taken from h2 for safe defaults
- h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS: 100,
- h2.settings.SettingCodes.MAX_HEADER_LIST_SIZE: 65536,
- },
- )
-
- # Some websites (*cough* Yahoo *cough*) balk at this setting being
- # present in the initial handshake since it's not defined in the original
- # RFC despite the RFC mandating ignoring settings you don't know about.
- del self._h2_state.local_settings[
- h2.settings.SettingCodes.ENABLE_CONNECT_PROTOCOL
- ]
-
- self._h2_state.initiate_connection()
- self._h2_state.increment_flow_control_window(2**24)
- await self._write_outgoing_data(request)
-
- # Sending the request...
-
- async def _send_request_headers(self, request: Request, stream_id: int) -> None:
- """
- Send the request headers to a given stream ID.
- """
- end_stream = not has_body_headers(request)
-
- # In HTTP/2 the ':authority' pseudo-header is used instead of 'Host'.
- # In order to gracefully handle HTTP/1.1 and HTTP/2 we always require
- # HTTP/1.1 style headers, and map them appropriately if we end up on
- # an HTTP/2 connection.
- authority = [v for k, v in request.headers if k.lower() == b"host"][0]
-
- headers = [
- (b":method", request.method),
- (b":authority", authority),
- (b":scheme", request.url.scheme),
- (b":path", request.url.target),
- ] + [
- (k.lower(), v)
- for k, v in request.headers
- if k.lower()
- not in (
- b"host",
- b"transfer-encoding",
- )
- ]
-
- self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)
- self._h2_state.increment_flow_control_window(2**24, stream_id=stream_id)
- await self._write_outgoing_data(request)
-
- async def _send_request_body(self, request: Request, stream_id: int) -> None:
- """
- Iterate over the request body sending it to a given stream ID.
- """
- if not has_body_headers(request):
- return
-
- assert isinstance(request.stream, typing.AsyncIterable)
- async for data in request.stream:
- await self._send_stream_data(request, stream_id, data)
- await self._send_end_stream(request, stream_id)
-
- async def _send_stream_data(
- self, request: Request, stream_id: int, data: bytes
- ) -> None:
- """
- Send a single chunk of data in one or more data frames.
- """
- while data:
- max_flow = await self._wait_for_outgoing_flow(request, stream_id)
- chunk_size = min(len(data), max_flow)
- chunk, data = data[:chunk_size], data[chunk_size:]
- self._h2_state.send_data(stream_id, chunk)
- await self._write_outgoing_data(request)
-
- async def _send_end_stream(self, request: Request, stream_id: int) -> None:
- """
- Send an empty data frame on on a given stream ID with the END_STREAM flag set.
- """
- self._h2_state.end_stream(stream_id)
- await self._write_outgoing_data(request)
-
- # Receiving the response...
-
- async def _receive_response(
- self, request: Request, stream_id: int
- ) -> tuple[int, list[tuple[bytes, bytes]]]:
- """
- Return the response status code and headers for a given stream ID.
- """
- while True:
- event = await self._receive_stream_event(request, stream_id)
- if isinstance(event, h2.events.ResponseReceived):
- break
-
- status_code = 200
- headers = []
- for k, v in event.headers:
- if k == b":status":
- status_code = int(v.decode("ascii", errors="ignore"))
- elif not k.startswith(b":"):
- headers.append((k, v))
-
- return (status_code, headers)
-
- async def _receive_response_body(
- self, request: Request, stream_id: int
- ) -> typing.AsyncIterator[bytes]:
- """
- Iterator that returns the bytes of the response body for a given stream ID.
- """
- while True:
- event = await self._receive_stream_event(request, stream_id)
- if isinstance(event, h2.events.DataReceived):
- amount = event.flow_controlled_length
- self._h2_state.acknowledge_received_data(amount, stream_id)
- await self._write_outgoing_data(request)
- yield event.data
- elif isinstance(event, h2.events.StreamEnded):
- break
-
- async def _receive_stream_event(
- self, request: Request, stream_id: int
- ) -> h2.events.ResponseReceived | h2.events.DataReceived | h2.events.StreamEnded:
- """
- Return the next available event for a given stream ID.
-
- Will read more data from the network if required.
- """
- while not self._events.get(stream_id):
- await self._receive_events(request, stream_id)
- event = self._events[stream_id].pop(0)
- if isinstance(event, h2.events.StreamReset):
- raise RemoteProtocolError(event)
- return event
-
- async def _receive_events(
- self, request: Request, stream_id: int | None = None
- ) -> None:
- """
- Read some data from the network until we see one or more events
- for a given stream ID.
- """
- async with self._read_lock:
- if self._connection_terminated is not None:
- last_stream_id = self._connection_terminated.last_stream_id
- if stream_id and last_stream_id and stream_id > last_stream_id:
- self._request_count -= 1
- raise ConnectionNotAvailable()
- raise RemoteProtocolError(self._connection_terminated)
-
- # This conditional is a bit icky. We don't want to block reading if we've
- # actually got an event to return for a given stream. We need to do that
- # check *within* the atomic read lock. Though it also need to be optional,
- # because when we call it from `_wait_for_outgoing_flow` we *do* want to
- # block until we've available flow control, event when we have events
- # pending for the stream ID we're attempting to send on.
- if stream_id is None or not self._events.get(stream_id):
- events = await self._read_incoming_data(request)
- for event in events:
- if isinstance(event, h2.events.RemoteSettingsChanged):
- async with Trace(
- "receive_remote_settings", logger, request
- ) as trace:
- await self._receive_remote_settings_change(event)
- trace.return_value = event
-
- elif isinstance(
- event,
- (
- h2.events.ResponseReceived,
- h2.events.DataReceived,
- h2.events.StreamEnded,
- h2.events.StreamReset,
- ),
- ):
- if event.stream_id in self._events:
- self._events[event.stream_id].append(event)
-
- elif isinstance(event, h2.events.ConnectionTerminated):
- self._connection_terminated = event
-
- await self._write_outgoing_data(request)
-
- async def _receive_remote_settings_change(self, event: h2.events.Event) -> None:
- max_concurrent_streams = event.changed_settings.get(
- h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS
- )
- if max_concurrent_streams:
- new_max_streams = min(
- max_concurrent_streams.new_value,
- self._h2_state.local_settings.max_concurrent_streams,
- )
- if new_max_streams and new_max_streams != self._max_streams:
- while new_max_streams > self._max_streams:
- await self._max_streams_semaphore.release()
- self._max_streams += 1
- while new_max_streams < self._max_streams:
- await self._max_streams_semaphore.acquire()
- self._max_streams -= 1
-
- async def _response_closed(self, stream_id: int) -> None:
- await self._max_streams_semaphore.release()
- del self._events[stream_id]
- async with self._state_lock:
- if self._connection_terminated and not self._events:
- await self.aclose()
-
- elif self._state == HTTPConnectionState.ACTIVE and not self._events:
- self._state = HTTPConnectionState.IDLE
- if self._keepalive_expiry is not None:
- now = time.monotonic()
- self._expire_at = now + self._keepalive_expiry
- if self._used_all_stream_ids: # pragma: nocover
- await self.aclose()
-
- async def aclose(self) -> None:
- # Note that this method unilaterally closes the connection, and does
- # not have any kind of locking in place around it.
- self._h2_state.close_connection()
- self._state = HTTPConnectionState.CLOSED
- await self._network_stream.aclose()
-
- # Wrappers around network read/write operations...
-
- async def _read_incoming_data(self, request: Request) -> list[h2.events.Event]:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("read", None)
-
- if self._read_exception is not None:
- raise self._read_exception # pragma: nocover
-
- try:
- data = await self._network_stream.read(self.READ_NUM_BYTES, timeout)
- if data == b"":
- raise RemoteProtocolError("Server disconnected")
- except Exception as exc:
- # If we get a network error we should:
- #
- # 1. Save the exception and just raise it immediately on any future reads.
- # (For example, this means that a single read timeout or disconnect will
- # immediately close all pending streams. Without requiring multiple
- # sequential timeouts.)
- # 2. Mark the connection as errored, so that we don't accept any other
- # incoming requests.
- self._read_exception = exc
- self._connection_error = True
- raise exc
-
- events: list[h2.events.Event] = self._h2_state.receive_data(data)
-
- return events
-
- async def _write_outgoing_data(self, request: Request) -> None:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("write", None)
-
- async with self._write_lock:
- data_to_send = self._h2_state.data_to_send()
-
- if self._write_exception is not None:
- raise self._write_exception # pragma: nocover
-
- try:
- await self._network_stream.write(data_to_send, timeout)
- except Exception as exc: # pragma: nocover
- # If we get a network error we should:
- #
- # 1. Save the exception and just raise it immediately on any future write.
- # (For example, this means that a single write timeout or disconnect will
- # immediately close all pending streams. Without requiring multiple
- # sequential timeouts.)
- # 2. Mark the connection as errored, so that we don't accept any other
- # incoming requests.
- self._write_exception = exc
- self._connection_error = True
- raise exc
-
- # Flow control...
-
- async def _wait_for_outgoing_flow(self, request: Request, stream_id: int) -> int:
- """
- Returns the maximum allowable outgoing flow for a given stream.
-
- If the allowable flow is zero, then waits on the network until
- WindowUpdated frames have increased the flow rate.
- https://tools.ietf.org/html/rfc7540#section-6.9
- """
- local_flow: int = self._h2_state.local_flow_control_window(stream_id)
- max_frame_size: int = self._h2_state.max_outbound_frame_size
- flow = min(local_flow, max_frame_size)
- while flow == 0:
- await self._receive_events(request)
- local_flow = self._h2_state.local_flow_control_window(stream_id)
- max_frame_size = self._h2_state.max_outbound_frame_size
- flow = min(local_flow, max_frame_size)
- return flow
-
- # Interface for connection pooling...
-
- def can_handle_request(self, origin: Origin) -> bool:
- return origin == self._origin
-
- def is_available(self) -> bool:
- return (
- self._state != HTTPConnectionState.CLOSED
- and not self._connection_error
- and not self._used_all_stream_ids
- and not (
- self._h2_state.state_machine.state
- == h2.connection.ConnectionState.CLOSED
- )
- )
-
- def has_expired(self) -> bool:
- now = time.monotonic()
- return self._expire_at is not None and now > self._expire_at
-
- def is_idle(self) -> bool:
- return self._state == HTTPConnectionState.IDLE
-
- def is_closed(self) -> bool:
- return self._state == HTTPConnectionState.CLOSED
-
- def info(self) -> str:
- origin = str(self._origin)
- return (
- f"{origin!r}, HTTP/2, {self._state.name}, "
- f"Request Count: {self._request_count}"
- )
-
- def __repr__(self) -> str:
- class_name = self.__class__.__name__
- origin = str(self._origin)
- return (
- f"<{class_name} [{origin!r}, {self._state.name}, "
- f"Request Count: {self._request_count}]>"
- )
-
- # These context managers are not used in the standard flow, but are
- # useful for testing or working with connection instances directly.
-
- async def __aenter__(self) -> AsyncHTTP2Connection:
- return self
-
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- await self.aclose()
-
-
-class HTTP2ConnectionByteStream:
- def __init__(
- self, connection: AsyncHTTP2Connection, request: Request, stream_id: int
- ) -> None:
- self._connection = connection
- self._request = request
- self._stream_id = stream_id
- self._closed = False
-
- async def __aiter__(self) -> typing.AsyncIterator[bytes]:
- kwargs = {"request": self._request, "stream_id": self._stream_id}
- try:
- async with Trace("receive_response_body", logger, self._request, kwargs):
- async for chunk in self._connection._receive_response_body(
- request=self._request, stream_id=self._stream_id
- ):
- yield chunk
- except BaseException as exc:
- # If we get an exception while streaming the response,
- # we want to close the response (and possibly the connection)
- # before raising that exception.
- with AsyncShieldCancellation():
- await self.aclose()
- raise exc
-
- async def aclose(self) -> None:
- if not self._closed:
- self._closed = True
- kwargs = {"stream_id": self._stream_id}
- async with Trace("response_closed", logger, self._request, kwargs):
- await self._connection._response_closed(stream_id=self._stream_id)
diff --git a/contrib/python/httpcore/httpcore/_async/http_proxy.py b/contrib/python/httpcore/httpcore/_async/http_proxy.py
deleted file mode 100644
index cc9d92066e1..00000000000
--- a/contrib/python/httpcore/httpcore/_async/http_proxy.py
+++ /dev/null
@@ -1,367 +0,0 @@
-from __future__ import annotations
-
-import base64
-import logging
-import ssl
-import typing
-
-from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
-from .._exceptions import ProxyError
-from .._models import (
- URL,
- Origin,
- Request,
- Response,
- enforce_bytes,
- enforce_headers,
- enforce_url,
-)
-from .._ssl import default_ssl_context
-from .._synchronization import AsyncLock
-from .._trace import Trace
-from .connection import AsyncHTTPConnection
-from .connection_pool import AsyncConnectionPool
-from .http11 import AsyncHTTP11Connection
-from .interfaces import AsyncConnectionInterface
-
-ByteOrStr = typing.Union[bytes, str]
-HeadersAsSequence = typing.Sequence[typing.Tuple[ByteOrStr, ByteOrStr]]
-HeadersAsMapping = typing.Mapping[ByteOrStr, ByteOrStr]
-
-
-logger = logging.getLogger("httpcore.proxy")
-
-
-def merge_headers(
- default_headers: typing.Sequence[tuple[bytes, bytes]] | None = None,
- override_headers: typing.Sequence[tuple[bytes, bytes]] | None = None,
-) -> list[tuple[bytes, bytes]]:
- """
- Append default_headers and override_headers, de-duplicating if a key exists
- in both cases.
- """
- default_headers = [] if default_headers is None else list(default_headers)
- override_headers = [] if override_headers is None else list(override_headers)
- has_override = set(key.lower() for key, value in override_headers)
- default_headers = [
- (key, value)
- for key, value in default_headers
- if key.lower() not in has_override
- ]
- return default_headers + override_headers
-
-
-class AsyncHTTPProxy(AsyncConnectionPool): # pragma: nocover
- """
- A connection pool that sends requests via an HTTP proxy.
- """
-
- def __init__(
- self,
- proxy_url: URL | bytes | str,
- proxy_auth: tuple[bytes | str, bytes | str] | None = None,
- proxy_headers: HeadersAsMapping | HeadersAsSequence | None = None,
- ssl_context: ssl.SSLContext | None = None,
- proxy_ssl_context: ssl.SSLContext | None = None,
- max_connections: int | None = 10,
- max_keepalive_connections: int | None = None,
- keepalive_expiry: float | None = None,
- http1: bool = True,
- http2: bool = False,
- retries: int = 0,
- local_address: str | None = None,
- uds: str | None = None,
- network_backend: AsyncNetworkBackend | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> None:
- """
- A connection pool for making HTTP requests.
-
- Parameters:
- proxy_url: The URL to use when connecting to the proxy server.
- For example `"http://127.0.0.1:8080/"`.
- proxy_auth: Any proxy authentication as a two-tuple of
- (username, password). May be either bytes or ascii-only str.
- proxy_headers: Any HTTP headers to use for the proxy requests.
- For example `{"Proxy-Authorization": "Basic <username>:<password>"}`.
- ssl_context: An SSL context to use for verifying connections.
- If not specified, the default `httpcore.default_ssl_context()`
- will be used.
- proxy_ssl_context: The same as `ssl_context`, but for a proxy server rather than a remote origin.
- max_connections: The maximum number of concurrent HTTP connections that
- the pool should allow. Any attempt to send a request on a pool that
- would exceed this amount will block until a connection is available.
- max_keepalive_connections: The maximum number of idle HTTP connections
- that will be maintained in the pool.
- keepalive_expiry: The duration in seconds that an idle HTTP connection
- may be maintained for before being expired from the pool.
- http1: A boolean indicating if HTTP/1.1 requests should be supported
- by the connection pool. Defaults to True.
- http2: A boolean indicating if HTTP/2 requests should be supported by
- the connection pool. Defaults to False.
- retries: The maximum number of retries when trying to establish
- a connection.
- local_address: Local address to connect from. Can also be used to
- connect using a particular address family. Using
- `local_address="0.0.0.0"` will connect using an `AF_INET` address
- (IPv4), while using `local_address="::"` will connect using an
- `AF_INET6` address (IPv6).
- uds: Path to a Unix Domain Socket to use instead of TCP sockets.
- network_backend: A backend instance to use for handling network I/O.
- """
- super().__init__(
- ssl_context=ssl_context,
- max_connections=max_connections,
- max_keepalive_connections=max_keepalive_connections,
- keepalive_expiry=keepalive_expiry,
- http1=http1,
- http2=http2,
- network_backend=network_backend,
- retries=retries,
- local_address=local_address,
- uds=uds,
- socket_options=socket_options,
- )
-
- self._proxy_url = enforce_url(proxy_url, name="proxy_url")
- if (
- self._proxy_url.scheme == b"http" and proxy_ssl_context is not None
- ): # pragma: no cover
- raise RuntimeError(
- "The `proxy_ssl_context` argument is not allowed for the http scheme"
- )
-
- self._ssl_context = ssl_context
- self._proxy_ssl_context = proxy_ssl_context
- self._proxy_headers = enforce_headers(proxy_headers, name="proxy_headers")
- if proxy_auth is not None:
- username = enforce_bytes(proxy_auth[0], name="proxy_auth")
- password = enforce_bytes(proxy_auth[1], name="proxy_auth")
- userpass = username + b":" + password
- authorization = b"Basic " + base64.b64encode(userpass)
- self._proxy_headers = [
- (b"Proxy-Authorization", authorization)
- ] + self._proxy_headers
-
- def create_connection(self, origin: Origin) -> AsyncConnectionInterface:
- if origin.scheme == b"http":
- return AsyncForwardHTTPConnection(
- proxy_origin=self._proxy_url.origin,
- proxy_headers=self._proxy_headers,
- remote_origin=origin,
- keepalive_expiry=self._keepalive_expiry,
- network_backend=self._network_backend,
- proxy_ssl_context=self._proxy_ssl_context,
- )
- return AsyncTunnelHTTPConnection(
- proxy_origin=self._proxy_url.origin,
- proxy_headers=self._proxy_headers,
- remote_origin=origin,
- ssl_context=self._ssl_context,
- proxy_ssl_context=self._proxy_ssl_context,
- keepalive_expiry=self._keepalive_expiry,
- http1=self._http1,
- http2=self._http2,
- network_backend=self._network_backend,
- )
-
-
-class AsyncForwardHTTPConnection(AsyncConnectionInterface):
- def __init__(
- self,
- proxy_origin: Origin,
- remote_origin: Origin,
- proxy_headers: HeadersAsMapping | HeadersAsSequence | None = None,
- keepalive_expiry: float | None = None,
- network_backend: AsyncNetworkBackend | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- proxy_ssl_context: ssl.SSLContext | None = None,
- ) -> None:
- self._connection = AsyncHTTPConnection(
- origin=proxy_origin,
- keepalive_expiry=keepalive_expiry,
- network_backend=network_backend,
- socket_options=socket_options,
- ssl_context=proxy_ssl_context,
- )
- self._proxy_origin = proxy_origin
- self._proxy_headers = enforce_headers(proxy_headers, name="proxy_headers")
- self._remote_origin = remote_origin
-
- async def handle_async_request(self, request: Request) -> Response:
- headers = merge_headers(self._proxy_headers, request.headers)
- url = URL(
- scheme=self._proxy_origin.scheme,
- host=self._proxy_origin.host,
- port=self._proxy_origin.port,
- target=bytes(request.url),
- )
- proxy_request = Request(
- method=request.method,
- url=url,
- headers=headers,
- content=request.stream,
- extensions=request.extensions,
- )
- return await self._connection.handle_async_request(proxy_request)
-
- def can_handle_request(self, origin: Origin) -> bool:
- return origin == self._remote_origin
-
- async def aclose(self) -> None:
- await self._connection.aclose()
-
- def info(self) -> str:
- return self._connection.info()
-
- def is_available(self) -> bool:
- return self._connection.is_available()
-
- def has_expired(self) -> bool:
- return self._connection.has_expired()
-
- def is_idle(self) -> bool:
- return self._connection.is_idle()
-
- def is_closed(self) -> bool:
- return self._connection.is_closed()
-
- def __repr__(self) -> str:
- return f"<{self.__class__.__name__} [{self.info()}]>"
-
-
-class AsyncTunnelHTTPConnection(AsyncConnectionInterface):
- def __init__(
- self,
- proxy_origin: Origin,
- remote_origin: Origin,
- ssl_context: ssl.SSLContext | None = None,
- proxy_ssl_context: ssl.SSLContext | None = None,
- proxy_headers: typing.Sequence[tuple[bytes, bytes]] | None = None,
- keepalive_expiry: float | None = None,
- http1: bool = True,
- http2: bool = False,
- network_backend: AsyncNetworkBackend | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> None:
- self._connection: AsyncConnectionInterface = AsyncHTTPConnection(
- origin=proxy_origin,
- keepalive_expiry=keepalive_expiry,
- network_backend=network_backend,
- socket_options=socket_options,
- ssl_context=proxy_ssl_context,
- )
- self._proxy_origin = proxy_origin
- self._remote_origin = remote_origin
- self._ssl_context = ssl_context
- self._proxy_ssl_context = proxy_ssl_context
- self._proxy_headers = enforce_headers(proxy_headers, name="proxy_headers")
- self._keepalive_expiry = keepalive_expiry
- self._http1 = http1
- self._http2 = http2
- self._connect_lock = AsyncLock()
- self._connected = False
-
- async def handle_async_request(self, request: Request) -> Response:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("connect", None)
-
- async with self._connect_lock:
- if not self._connected:
- target = b"%b:%d" % (self._remote_origin.host, self._remote_origin.port)
-
- connect_url = URL(
- scheme=self._proxy_origin.scheme,
- host=self._proxy_origin.host,
- port=self._proxy_origin.port,
- target=target,
- )
- connect_headers = merge_headers(
- [(b"Host", target), (b"Accept", b"*/*")], self._proxy_headers
- )
- connect_request = Request(
- method=b"CONNECT",
- url=connect_url,
- headers=connect_headers,
- extensions=request.extensions,
- )
- connect_response = await self._connection.handle_async_request(
- connect_request
- )
-
- if connect_response.status < 200 or connect_response.status > 299:
- reason_bytes = connect_response.extensions.get("reason_phrase", b"")
- reason_str = reason_bytes.decode("ascii", errors="ignore")
- msg = "%d %s" % (connect_response.status, reason_str)
- await self._connection.aclose()
- raise ProxyError(msg)
-
- stream = connect_response.extensions["network_stream"]
-
- # Upgrade the stream to SSL
- ssl_context = (
- default_ssl_context()
- if self._ssl_context is None
- else self._ssl_context
- )
- alpn_protocols = ["http/1.1", "h2"] if self._http2 else ["http/1.1"]
- ssl_context.set_alpn_protocols(alpn_protocols)
-
- kwargs = {
- "ssl_context": ssl_context,
- "server_hostname": self._remote_origin.host.decode("ascii"),
- "timeout": timeout,
- }
- async with Trace("start_tls", logger, request, kwargs) as trace:
- stream = await stream.start_tls(**kwargs)
- trace.return_value = stream
-
- # Determine if we should be using HTTP/1.1 or HTTP/2
- ssl_object = stream.get_extra_info("ssl_object")
- http2_negotiated = (
- ssl_object is not None
- and ssl_object.selected_alpn_protocol() == "h2"
- )
-
- # Create the HTTP/1.1 or HTTP/2 connection
- if http2_negotiated or (self._http2 and not self._http1):
- from .http2 import AsyncHTTP2Connection
-
- self._connection = AsyncHTTP2Connection(
- origin=self._remote_origin,
- stream=stream,
- keepalive_expiry=self._keepalive_expiry,
- )
- else:
- self._connection = AsyncHTTP11Connection(
- origin=self._remote_origin,
- stream=stream,
- keepalive_expiry=self._keepalive_expiry,
- )
-
- self._connected = True
- return await self._connection.handle_async_request(request)
-
- def can_handle_request(self, origin: Origin) -> bool:
- return origin == self._remote_origin
-
- async def aclose(self) -> None:
- await self._connection.aclose()
-
- def info(self) -> str:
- return self._connection.info()
-
- def is_available(self) -> bool:
- return self._connection.is_available()
-
- def has_expired(self) -> bool:
- return self._connection.has_expired()
-
- def is_idle(self) -> bool:
- return self._connection.is_idle()
-
- def is_closed(self) -> bool:
- return self._connection.is_closed()
-
- def __repr__(self) -> str:
- return f"<{self.__class__.__name__} [{self.info()}]>"
diff --git a/contrib/python/httpcore/httpcore/_async/interfaces.py b/contrib/python/httpcore/httpcore/_async/interfaces.py
deleted file mode 100644
index 361583bede6..00000000000
--- a/contrib/python/httpcore/httpcore/_async/interfaces.py
+++ /dev/null
@@ -1,137 +0,0 @@
-from __future__ import annotations
-
-import contextlib
-import typing
-
-from .._models import (
- URL,
- Extensions,
- HeaderTypes,
- Origin,
- Request,
- Response,
- enforce_bytes,
- enforce_headers,
- enforce_url,
- include_request_headers,
-)
-
-
-class AsyncRequestInterface:
- async def request(
- self,
- method: bytes | str,
- url: URL | bytes | str,
- *,
- headers: HeaderTypes = None,
- content: bytes | typing.AsyncIterator[bytes] | None = None,
- extensions: Extensions | None = None,
- ) -> Response:
- # Strict type checking on our parameters.
- method = enforce_bytes(method, name="method")
- url = enforce_url(url, name="url")
- headers = enforce_headers(headers, name="headers")
-
- # Include Host header, and optionally Content-Length or Transfer-Encoding.
- headers = include_request_headers(headers, url=url, content=content)
-
- request = Request(
- method=method,
- url=url,
- headers=headers,
- content=content,
- extensions=extensions,
- )
- response = await self.handle_async_request(request)
- try:
- await response.aread()
- finally:
- await response.aclose()
- return response
-
- @contextlib.asynccontextmanager
- async def stream(
- self,
- method: bytes | str,
- url: URL | bytes | str,
- *,
- headers: HeaderTypes = None,
- content: bytes | typing.AsyncIterator[bytes] | None = None,
- extensions: Extensions | None = None,
- ) -> typing.AsyncIterator[Response]:
- # Strict type checking on our parameters.
- method = enforce_bytes(method, name="method")
- url = enforce_url(url, name="url")
- headers = enforce_headers(headers, name="headers")
-
- # Include Host header, and optionally Content-Length or Transfer-Encoding.
- headers = include_request_headers(headers, url=url, content=content)
-
- request = Request(
- method=method,
- url=url,
- headers=headers,
- content=content,
- extensions=extensions,
- )
- response = await self.handle_async_request(request)
- try:
- yield response
- finally:
- await response.aclose()
-
- async def handle_async_request(self, request: Request) -> Response:
- raise NotImplementedError() # pragma: nocover
-
-
-class AsyncConnectionInterface(AsyncRequestInterface):
- async def aclose(self) -> None:
- raise NotImplementedError() # pragma: nocover
-
- def info(self) -> str:
- raise NotImplementedError() # pragma: nocover
-
- def can_handle_request(self, origin: Origin) -> bool:
- raise NotImplementedError() # pragma: nocover
-
- def is_available(self) -> bool:
- """
- Return `True` if the connection is currently able to accept an
- outgoing request.
-
- An HTTP/1.1 connection will only be available if it is currently idle.
-
- An HTTP/2 connection will be available so long as the stream ID space is
- not yet exhausted, and the connection is not in an error state.
-
- While the connection is being established we may not yet know if it is going
- to result in an HTTP/1.1 or HTTP/2 connection. The connection should be
- treated as being available, but might ultimately raise `NewConnectionRequired`
- required exceptions if multiple requests are attempted over a connection
- that ends up being established as HTTP/1.1.
- """
- raise NotImplementedError() # pragma: nocover
-
- def has_expired(self) -> bool:
- """
- Return `True` if the connection is in a state where it should be closed.
-
- This either means that the connection is idle and it has passed the
- expiry time on its keep-alive, or that server has sent an EOF.
- """
- raise NotImplementedError() # pragma: nocover
-
- def is_idle(self) -> bool:
- """
- Return `True` if the connection is currently idle.
- """
- raise NotImplementedError() # pragma: nocover
-
- def is_closed(self) -> bool:
- """
- Return `True` if the connection has been closed.
-
- Used when a response is closed to determine if the connection may be
- returned to the connection pool or not.
- """
- raise NotImplementedError() # pragma: nocover
diff --git a/contrib/python/httpcore/httpcore/_async/socks_proxy.py b/contrib/python/httpcore/httpcore/_async/socks_proxy.py
deleted file mode 100644
index b363f55a0b0..00000000000
--- a/contrib/python/httpcore/httpcore/_async/socks_proxy.py
+++ /dev/null
@@ -1,341 +0,0 @@
-from __future__ import annotations
-
-import logging
-import ssl
-
-import socksio
-
-from .._backends.auto import AutoBackend
-from .._backends.base import AsyncNetworkBackend, AsyncNetworkStream
-from .._exceptions import ConnectionNotAvailable, ProxyError
-from .._models import URL, Origin, Request, Response, enforce_bytes, enforce_url
-from .._ssl import default_ssl_context
-from .._synchronization import AsyncLock
-from .._trace import Trace
-from .connection_pool import AsyncConnectionPool
-from .http11 import AsyncHTTP11Connection
-from .interfaces import AsyncConnectionInterface
-
-logger = logging.getLogger("httpcore.socks")
-
-
-AUTH_METHODS = {
- b"\x00": "NO AUTHENTICATION REQUIRED",
- b"\x01": "GSSAPI",
- b"\x02": "USERNAME/PASSWORD",
- b"\xff": "NO ACCEPTABLE METHODS",
-}
-
-REPLY_CODES = {
- b"\x00": "Succeeded",
- b"\x01": "General SOCKS server failure",
- b"\x02": "Connection not allowed by ruleset",
- b"\x03": "Network unreachable",
- b"\x04": "Host unreachable",
- b"\x05": "Connection refused",
- b"\x06": "TTL expired",
- b"\x07": "Command not supported",
- b"\x08": "Address type not supported",
-}
-
-
-async def _init_socks5_connection(
- stream: AsyncNetworkStream,
- *,
- host: bytes,
- port: int,
- auth: tuple[bytes, bytes] | None = None,
-) -> None:
- conn = socksio.socks5.SOCKS5Connection()
-
- # Auth method request
- auth_method = (
- socksio.socks5.SOCKS5AuthMethod.NO_AUTH_REQUIRED
- if auth is None
- else socksio.socks5.SOCKS5AuthMethod.USERNAME_PASSWORD
- )
- conn.send(socksio.socks5.SOCKS5AuthMethodsRequest([auth_method]))
- outgoing_bytes = conn.data_to_send()
- await stream.write(outgoing_bytes)
-
- # Auth method response
- incoming_bytes = await stream.read(max_bytes=4096)
- response = conn.receive_data(incoming_bytes)
- assert isinstance(response, socksio.socks5.SOCKS5AuthReply)
- if response.method != auth_method:
- requested = AUTH_METHODS.get(auth_method, "UNKNOWN")
- responded = AUTH_METHODS.get(response.method, "UNKNOWN")
- raise ProxyError(
- f"Requested {requested} from proxy server, but got {responded}."
- )
-
- if response.method == socksio.socks5.SOCKS5AuthMethod.USERNAME_PASSWORD:
- # Username/password request
- assert auth is not None
- username, password = auth
- conn.send(socksio.socks5.SOCKS5UsernamePasswordRequest(username, password))
- outgoing_bytes = conn.data_to_send()
- await stream.write(outgoing_bytes)
-
- # Username/password response
- incoming_bytes = await stream.read(max_bytes=4096)
- response = conn.receive_data(incoming_bytes)
- assert isinstance(response, socksio.socks5.SOCKS5UsernamePasswordReply)
- if not response.success:
- raise ProxyError("Invalid username/password")
-
- # Connect request
- conn.send(
- socksio.socks5.SOCKS5CommandRequest.from_address(
- socksio.socks5.SOCKS5Command.CONNECT, (host, port)
- )
- )
- outgoing_bytes = conn.data_to_send()
- await stream.write(outgoing_bytes)
-
- # Connect response
- incoming_bytes = await stream.read(max_bytes=4096)
- response = conn.receive_data(incoming_bytes)
- assert isinstance(response, socksio.socks5.SOCKS5Reply)
- if response.reply_code != socksio.socks5.SOCKS5ReplyCode.SUCCEEDED:
- reply_code = REPLY_CODES.get(response.reply_code, "UNKOWN")
- raise ProxyError(f"Proxy Server could not connect: {reply_code}.")
-
-
-class AsyncSOCKSProxy(AsyncConnectionPool): # pragma: nocover
- """
- A connection pool that sends requests via an HTTP proxy.
- """
-
- def __init__(
- self,
- proxy_url: URL | bytes | str,
- proxy_auth: tuple[bytes | str, bytes | str] | None = None,
- ssl_context: ssl.SSLContext | None = None,
- max_connections: int | None = 10,
- max_keepalive_connections: int | None = None,
- keepalive_expiry: float | None = None,
- http1: bool = True,
- http2: bool = False,
- retries: int = 0,
- network_backend: AsyncNetworkBackend | None = None,
- ) -> None:
- """
- A connection pool for making HTTP requests.
-
- Parameters:
- proxy_url: The URL to use when connecting to the proxy server.
- For example `"http://127.0.0.1:8080/"`.
- ssl_context: An SSL context to use for verifying connections.
- If not specified, the default `httpcore.default_ssl_context()`
- will be used.
- max_connections: The maximum number of concurrent HTTP connections that
- the pool should allow. Any attempt to send a request on a pool that
- would exceed this amount will block until a connection is available.
- max_keepalive_connections: The maximum number of idle HTTP connections
- that will be maintained in the pool.
- keepalive_expiry: The duration in seconds that an idle HTTP connection
- may be maintained for before being expired from the pool.
- http1: A boolean indicating if HTTP/1.1 requests should be supported
- by the connection pool. Defaults to True.
- http2: A boolean indicating if HTTP/2 requests should be supported by
- the connection pool. Defaults to False.
- retries: The maximum number of retries when trying to establish
- a connection.
- local_address: Local address to connect from. Can also be used to
- connect using a particular address family. Using
- `local_address="0.0.0.0"` will connect using an `AF_INET` address
- (IPv4), while using `local_address="::"` will connect using an
- `AF_INET6` address (IPv6).
- uds: Path to a Unix Domain Socket to use instead of TCP sockets.
- network_backend: A backend instance to use for handling network I/O.
- """
- super().__init__(
- ssl_context=ssl_context,
- max_connections=max_connections,
- max_keepalive_connections=max_keepalive_connections,
- keepalive_expiry=keepalive_expiry,
- http1=http1,
- http2=http2,
- network_backend=network_backend,
- retries=retries,
- )
- self._ssl_context = ssl_context
- self._proxy_url = enforce_url(proxy_url, name="proxy_url")
- if proxy_auth is not None:
- username, password = proxy_auth
- username_bytes = enforce_bytes(username, name="proxy_auth")
- password_bytes = enforce_bytes(password, name="proxy_auth")
- self._proxy_auth: tuple[bytes, bytes] | None = (
- username_bytes,
- password_bytes,
- )
- else:
- self._proxy_auth = None
-
- def create_connection(self, origin: Origin) -> AsyncConnectionInterface:
- return AsyncSocks5Connection(
- proxy_origin=self._proxy_url.origin,
- remote_origin=origin,
- proxy_auth=self._proxy_auth,
- ssl_context=self._ssl_context,
- keepalive_expiry=self._keepalive_expiry,
- http1=self._http1,
- http2=self._http2,
- network_backend=self._network_backend,
- )
-
-
-class AsyncSocks5Connection(AsyncConnectionInterface):
- def __init__(
- self,
- proxy_origin: Origin,
- remote_origin: Origin,
- proxy_auth: tuple[bytes, bytes] | None = None,
- ssl_context: ssl.SSLContext | None = None,
- keepalive_expiry: float | None = None,
- http1: bool = True,
- http2: bool = False,
- network_backend: AsyncNetworkBackend | None = None,
- ) -> None:
- self._proxy_origin = proxy_origin
- self._remote_origin = remote_origin
- self._proxy_auth = proxy_auth
- self._ssl_context = ssl_context
- self._keepalive_expiry = keepalive_expiry
- self._http1 = http1
- self._http2 = http2
-
- self._network_backend: AsyncNetworkBackend = (
- AutoBackend() if network_backend is None else network_backend
- )
- self._connect_lock = AsyncLock()
- self._connection: AsyncConnectionInterface | None = None
- self._connect_failed = False
-
- async def handle_async_request(self, request: Request) -> Response:
- timeouts = request.extensions.get("timeout", {})
- sni_hostname = request.extensions.get("sni_hostname", None)
- timeout = timeouts.get("connect", None)
-
- async with self._connect_lock:
- if self._connection is None:
- try:
- # Connect to the proxy
- kwargs = {
- "host": self._proxy_origin.host.decode("ascii"),
- "port": self._proxy_origin.port,
- "timeout": timeout,
- }
- async with Trace("connect_tcp", logger, request, kwargs) as trace:
- stream = await self._network_backend.connect_tcp(**kwargs)
- trace.return_value = stream
-
- # Connect to the remote host using socks5
- kwargs = {
- "stream": stream,
- "host": self._remote_origin.host.decode("ascii"),
- "port": self._remote_origin.port,
- "auth": self._proxy_auth,
- }
- async with Trace(
- "setup_socks5_connection", logger, request, kwargs
- ) as trace:
- await _init_socks5_connection(**kwargs)
- trace.return_value = stream
-
- # Upgrade the stream to SSL
- if self._remote_origin.scheme == b"https":
- ssl_context = (
- default_ssl_context()
- if self._ssl_context is None
- else self._ssl_context
- )
- alpn_protocols = (
- ["http/1.1", "h2"] if self._http2 else ["http/1.1"]
- )
- ssl_context.set_alpn_protocols(alpn_protocols)
-
- kwargs = {
- "ssl_context": ssl_context,
- "server_hostname": sni_hostname
- or self._remote_origin.host.decode("ascii"),
- "timeout": timeout,
- }
- async with Trace("start_tls", logger, request, kwargs) as trace:
- stream = await stream.start_tls(**kwargs)
- trace.return_value = stream
-
- # Determine if we should be using HTTP/1.1 or HTTP/2
- ssl_object = stream.get_extra_info("ssl_object")
- http2_negotiated = (
- ssl_object is not None
- and ssl_object.selected_alpn_protocol() == "h2"
- )
-
- # Create the HTTP/1.1 or HTTP/2 connection
- if http2_negotiated or (
- self._http2 and not self._http1
- ): # pragma: nocover
- from .http2 import AsyncHTTP2Connection
-
- self._connection = AsyncHTTP2Connection(
- origin=self._remote_origin,
- stream=stream,
- keepalive_expiry=self._keepalive_expiry,
- )
- else:
- self._connection = AsyncHTTP11Connection(
- origin=self._remote_origin,
- stream=stream,
- keepalive_expiry=self._keepalive_expiry,
- )
- except Exception as exc:
- self._connect_failed = True
- raise exc
- elif not self._connection.is_available(): # pragma: nocover
- raise ConnectionNotAvailable()
-
- return await self._connection.handle_async_request(request)
-
- def can_handle_request(self, origin: Origin) -> bool:
- return origin == self._remote_origin
-
- async def aclose(self) -> None:
- if self._connection is not None:
- await self._connection.aclose()
-
- def is_available(self) -> bool:
- if self._connection is None: # pragma: nocover
- # If HTTP/2 support is enabled, and the resulting connection could
- # end up as HTTP/2 then we should indicate the connection as being
- # available to service multiple requests.
- return (
- self._http2
- and (self._remote_origin.scheme == b"https" or not self._http1)
- and not self._connect_failed
- )
- return self._connection.is_available()
-
- def has_expired(self) -> bool:
- if self._connection is None: # pragma: nocover
- return self._connect_failed
- return self._connection.has_expired()
-
- def is_idle(self) -> bool:
- if self._connection is None: # pragma: nocover
- return self._connect_failed
- return self._connection.is_idle()
-
- def is_closed(self) -> bool:
- if self._connection is None: # pragma: nocover
- return self._connect_failed
- return self._connection.is_closed()
-
- def info(self) -> str:
- if self._connection is None: # pragma: nocover
- return "CONNECTION FAILED" if self._connect_failed else "CONNECTING"
- return self._connection.info()
-
- def __repr__(self) -> str:
- return f"<{self.__class__.__name__} [{self.info()}]>"
diff --git a/contrib/python/httpcore/httpcore/_backends/__init__.py b/contrib/python/httpcore/httpcore/_backends/__init__.py
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/contrib/python/httpcore/httpcore/_backends/__init__.py
+++ /dev/null
diff --git a/contrib/python/httpcore/httpcore/_backends/anyio.py b/contrib/python/httpcore/httpcore/_backends/anyio.py
deleted file mode 100644
index a140095e1b8..00000000000
--- a/contrib/python/httpcore/httpcore/_backends/anyio.py
+++ /dev/null
@@ -1,146 +0,0 @@
-from __future__ import annotations
-
-import ssl
-import typing
-
-import anyio
-
-from .._exceptions import (
- ConnectError,
- ConnectTimeout,
- ReadError,
- ReadTimeout,
- WriteError,
- WriteTimeout,
- map_exceptions,
-)
-from .._utils import is_socket_readable
-from .base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream
-
-
-class AnyIOStream(AsyncNetworkStream):
- def __init__(self, stream: anyio.abc.ByteStream) -> None:
- self._stream = stream
-
- async def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
- exc_map = {
- TimeoutError: ReadTimeout,
- anyio.BrokenResourceError: ReadError,
- anyio.ClosedResourceError: ReadError,
- anyio.EndOfStream: ReadError,
- }
- with map_exceptions(exc_map):
- with anyio.fail_after(timeout):
- try:
- return await self._stream.receive(max_bytes=max_bytes)
- except anyio.EndOfStream: # pragma: nocover
- return b""
-
- async def write(self, buffer: bytes, timeout: float | None = None) -> None:
- if not buffer:
- return
-
- exc_map = {
- TimeoutError: WriteTimeout,
- anyio.BrokenResourceError: WriteError,
- anyio.ClosedResourceError: WriteError,
- }
- with map_exceptions(exc_map):
- with anyio.fail_after(timeout):
- await self._stream.send(item=buffer)
-
- async def aclose(self) -> None:
- await self._stream.aclose()
-
- async def start_tls(
- self,
- ssl_context: ssl.SSLContext,
- server_hostname: str | None = None,
- timeout: float | None = None,
- ) -> AsyncNetworkStream:
- exc_map = {
- TimeoutError: ConnectTimeout,
- anyio.BrokenResourceError: ConnectError,
- anyio.EndOfStream: ConnectError,
- ssl.SSLError: ConnectError,
- }
- with map_exceptions(exc_map):
- try:
- with anyio.fail_after(timeout):
- ssl_stream = await anyio.streams.tls.TLSStream.wrap(
- self._stream,
- ssl_context=ssl_context,
- hostname=server_hostname,
- standard_compatible=False,
- server_side=False,
- )
- except Exception as exc: # pragma: nocover
- await self.aclose()
- raise exc
- return AnyIOStream(ssl_stream)
-
- def get_extra_info(self, info: str) -> typing.Any:
- if info == "ssl_object":
- return self._stream.extra(anyio.streams.tls.TLSAttribute.ssl_object, None)
- if info == "client_addr":
- return self._stream.extra(anyio.abc.SocketAttribute.local_address, None)
- if info == "server_addr":
- return self._stream.extra(anyio.abc.SocketAttribute.remote_address, None)
- if info == "socket":
- return self._stream.extra(anyio.abc.SocketAttribute.raw_socket, None)
- if info == "is_readable":
- sock = self._stream.extra(anyio.abc.SocketAttribute.raw_socket, None)
- return is_socket_readable(sock)
- return None
-
-
-class AnyIOBackend(AsyncNetworkBackend):
- async def connect_tcp(
- self,
- host: str,
- port: int,
- timeout: float | None = None,
- local_address: str | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> AsyncNetworkStream: # pragma: nocover
- if socket_options is None:
- socket_options = []
- exc_map = {
- TimeoutError: ConnectTimeout,
- OSError: ConnectError,
- anyio.BrokenResourceError: ConnectError,
- }
- with map_exceptions(exc_map):
- with anyio.fail_after(timeout):
- stream: anyio.abc.ByteStream = await anyio.connect_tcp(
- remote_host=host,
- remote_port=port,
- local_host=local_address,
- )
- # By default TCP sockets opened in `asyncio` include TCP_NODELAY.
- for option in socket_options:
- stream._raw_socket.setsockopt(*option) # type: ignore[attr-defined] # pragma: no cover
- return AnyIOStream(stream)
-
- async def connect_unix_socket(
- self,
- path: str,
- timeout: float | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> AsyncNetworkStream: # pragma: nocover
- if socket_options is None:
- socket_options = []
- exc_map = {
- TimeoutError: ConnectTimeout,
- OSError: ConnectError,
- anyio.BrokenResourceError: ConnectError,
- }
- with map_exceptions(exc_map):
- with anyio.fail_after(timeout):
- stream: anyio.abc.ByteStream = await anyio.connect_unix(path)
- for option in socket_options:
- stream._raw_socket.setsockopt(*option) # type: ignore[attr-defined] # pragma: no cover
- return AnyIOStream(stream)
-
- async def sleep(self, seconds: float) -> None:
- await anyio.sleep(seconds) # pragma: nocover
diff --git a/contrib/python/httpcore/httpcore/_backends/auto.py b/contrib/python/httpcore/httpcore/_backends/auto.py
deleted file mode 100644
index 49f0e698c97..00000000000
--- a/contrib/python/httpcore/httpcore/_backends/auto.py
+++ /dev/null
@@ -1,52 +0,0 @@
-from __future__ import annotations
-
-import typing
-
-from .._synchronization import current_async_library
-from .base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream
-
-
-class AutoBackend(AsyncNetworkBackend):
- async def _init_backend(self) -> None:
- if not (hasattr(self, "_backend")):
- backend = current_async_library()
- if backend == "trio":
- from .trio import TrioBackend
-
- self._backend: AsyncNetworkBackend = TrioBackend()
- else:
- from .anyio import AnyIOBackend
-
- self._backend = AnyIOBackend()
-
- async def connect_tcp(
- self,
- host: str,
- port: int,
- timeout: float | None = None,
- local_address: str | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> AsyncNetworkStream:
- await self._init_backend()
- return await self._backend.connect_tcp(
- host,
- port,
- timeout=timeout,
- local_address=local_address,
- socket_options=socket_options,
- )
-
- async def connect_unix_socket(
- self,
- path: str,
- timeout: float | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> AsyncNetworkStream: # pragma: nocover
- await self._init_backend()
- return await self._backend.connect_unix_socket(
- path, timeout=timeout, socket_options=socket_options
- )
-
- async def sleep(self, seconds: float) -> None: # pragma: nocover
- await self._init_backend()
- return await self._backend.sleep(seconds)
diff --git a/contrib/python/httpcore/httpcore/_backends/base.py b/contrib/python/httpcore/httpcore/_backends/base.py
deleted file mode 100644
index cf55c8b10eb..00000000000
--- a/contrib/python/httpcore/httpcore/_backends/base.py
+++ /dev/null
@@ -1,101 +0,0 @@
-from __future__ import annotations
-
-import ssl
-import time
-import typing
-
-SOCKET_OPTION = typing.Union[
- typing.Tuple[int, int, int],
- typing.Tuple[int, int, typing.Union[bytes, bytearray]],
- typing.Tuple[int, int, None, int],
-]
-
-
-class NetworkStream:
- def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
- raise NotImplementedError() # pragma: nocover
-
- def write(self, buffer: bytes, timeout: float | None = None) -> None:
- raise NotImplementedError() # pragma: nocover
-
- def close(self) -> None:
- raise NotImplementedError() # pragma: nocover
-
- def start_tls(
- self,
- ssl_context: ssl.SSLContext,
- server_hostname: str | None = None,
- timeout: float | None = None,
- ) -> NetworkStream:
- raise NotImplementedError() # pragma: nocover
-
- def get_extra_info(self, info: str) -> typing.Any:
- return None # pragma: nocover
-
-
-class NetworkBackend:
- def connect_tcp(
- self,
- host: str,
- port: int,
- timeout: float | None = None,
- local_address: str | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> NetworkStream:
- raise NotImplementedError() # pragma: nocover
-
- def connect_unix_socket(
- self,
- path: str,
- timeout: float | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> NetworkStream:
- raise NotImplementedError() # pragma: nocover
-
- def sleep(self, seconds: float) -> None:
- time.sleep(seconds) # pragma: nocover
-
-
-class AsyncNetworkStream:
- async def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
- raise NotImplementedError() # pragma: nocover
-
- async def write(self, buffer: bytes, timeout: float | None = None) -> None:
- raise NotImplementedError() # pragma: nocover
-
- async def aclose(self) -> None:
- raise NotImplementedError() # pragma: nocover
-
- async def start_tls(
- self,
- ssl_context: ssl.SSLContext,
- server_hostname: str | None = None,
- timeout: float | None = None,
- ) -> AsyncNetworkStream:
- raise NotImplementedError() # pragma: nocover
-
- def get_extra_info(self, info: str) -> typing.Any:
- return None # pragma: nocover
-
-
-class AsyncNetworkBackend:
- async def connect_tcp(
- self,
- host: str,
- port: int,
- timeout: float | None = None,
- local_address: str | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> AsyncNetworkStream:
- raise NotImplementedError() # pragma: nocover
-
- async def connect_unix_socket(
- self,
- path: str,
- timeout: float | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> AsyncNetworkStream:
- raise NotImplementedError() # pragma: nocover
-
- async def sleep(self, seconds: float) -> None:
- raise NotImplementedError() # pragma: nocover
diff --git a/contrib/python/httpcore/httpcore/_backends/mock.py b/contrib/python/httpcore/httpcore/_backends/mock.py
deleted file mode 100644
index 9b6edca03d4..00000000000
--- a/contrib/python/httpcore/httpcore/_backends/mock.py
+++ /dev/null
@@ -1,143 +0,0 @@
-from __future__ import annotations
-
-import ssl
-import typing
-
-from .._exceptions import ReadError
-from .base import (
- SOCKET_OPTION,
- AsyncNetworkBackend,
- AsyncNetworkStream,
- NetworkBackend,
- NetworkStream,
-)
-
-
-class MockSSLObject:
- def __init__(self, http2: bool):
- self._http2 = http2
-
- def selected_alpn_protocol(self) -> str:
- return "h2" if self._http2 else "http/1.1"
-
-
-class MockStream(NetworkStream):
- def __init__(self, buffer: list[bytes], http2: bool = False) -> None:
- self._buffer = buffer
- self._http2 = http2
- self._closed = False
-
- def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
- if self._closed:
- raise ReadError("Connection closed")
- if not self._buffer:
- return b""
- return self._buffer.pop(0)
-
- def write(self, buffer: bytes, timeout: float | None = None) -> None:
- pass
-
- def close(self) -> None:
- self._closed = True
-
- def start_tls(
- self,
- ssl_context: ssl.SSLContext,
- server_hostname: str | None = None,
- timeout: float | None = None,
- ) -> NetworkStream:
- return self
-
- def get_extra_info(self, info: str) -> typing.Any:
- return MockSSLObject(http2=self._http2) if info == "ssl_object" else None
-
- def __repr__(self) -> str:
- return "<httpcore.MockStream>"
-
-
-class MockBackend(NetworkBackend):
- def __init__(self, buffer: list[bytes], http2: bool = False) -> None:
- self._buffer = buffer
- self._http2 = http2
-
- def connect_tcp(
- self,
- host: str,
- port: int,
- timeout: float | None = None,
- local_address: str | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> NetworkStream:
- return MockStream(list(self._buffer), http2=self._http2)
-
- def connect_unix_socket(
- self,
- path: str,
- timeout: float | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> NetworkStream:
- return MockStream(list(self._buffer), http2=self._http2)
-
- def sleep(self, seconds: float) -> None:
- pass
-
-
-class AsyncMockStream(AsyncNetworkStream):
- def __init__(self, buffer: list[bytes], http2: bool = False) -> None:
- self._buffer = buffer
- self._http2 = http2
- self._closed = False
-
- async def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
- if self._closed:
- raise ReadError("Connection closed")
- if not self._buffer:
- return b""
- return self._buffer.pop(0)
-
- async def write(self, buffer: bytes, timeout: float | None = None) -> None:
- pass
-
- async def aclose(self) -> None:
- self._closed = True
-
- async def start_tls(
- self,
- ssl_context: ssl.SSLContext,
- server_hostname: str | None = None,
- timeout: float | None = None,
- ) -> AsyncNetworkStream:
- return self
-
- def get_extra_info(self, info: str) -> typing.Any:
- return MockSSLObject(http2=self._http2) if info == "ssl_object" else None
-
- def __repr__(self) -> str:
- return "<httpcore.AsyncMockStream>"
-
-
-class AsyncMockBackend(AsyncNetworkBackend):
- def __init__(self, buffer: list[bytes], http2: bool = False) -> None:
- self._buffer = buffer
- self._http2 = http2
-
- async def connect_tcp(
- self,
- host: str,
- port: int,
- timeout: float | None = None,
- local_address: str | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> AsyncNetworkStream:
- return AsyncMockStream(list(self._buffer), http2=self._http2)
-
- async def connect_unix_socket(
- self,
- path: str,
- timeout: float | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> AsyncNetworkStream:
- return AsyncMockStream(list(self._buffer), http2=self._http2)
-
- async def sleep(self, seconds: float) -> None:
- pass
diff --git a/contrib/python/httpcore/httpcore/_backends/sync.py b/contrib/python/httpcore/httpcore/_backends/sync.py
deleted file mode 100644
index 4018a09c6fb..00000000000
--- a/contrib/python/httpcore/httpcore/_backends/sync.py
+++ /dev/null
@@ -1,241 +0,0 @@
-from __future__ import annotations
-
-import functools
-import socket
-import ssl
-import sys
-import typing
-
-from .._exceptions import (
- ConnectError,
- ConnectTimeout,
- ExceptionMapping,
- ReadError,
- ReadTimeout,
- WriteError,
- WriteTimeout,
- map_exceptions,
-)
-from .._utils import is_socket_readable
-from .base import SOCKET_OPTION, NetworkBackend, NetworkStream
-
-
-class TLSinTLSStream(NetworkStream): # pragma: no cover
- """
- Because the standard `SSLContext.wrap_socket` method does
- not work for `SSLSocket` objects, we need this class
- to implement TLS stream using an underlying `SSLObject`
- instance in order to support TLS on top of TLS.
- """
-
- # Defined in RFC 8449
- TLS_RECORD_SIZE = 16384
-
- def __init__(
- self,
- sock: socket.socket,
- ssl_context: ssl.SSLContext,
- server_hostname: str | None = None,
- timeout: float | None = None,
- ):
- self._sock = sock
- self._incoming = ssl.MemoryBIO()
- self._outgoing = ssl.MemoryBIO()
-
- self.ssl_obj = ssl_context.wrap_bio(
- incoming=self._incoming,
- outgoing=self._outgoing,
- server_hostname=server_hostname,
- )
-
- self._sock.settimeout(timeout)
- self._perform_io(self.ssl_obj.do_handshake)
-
- def _perform_io(
- self,
- func: typing.Callable[..., typing.Any],
- ) -> typing.Any:
- ret = None
-
- while True:
- errno = None
- try:
- ret = func()
- except (ssl.SSLWantReadError, ssl.SSLWantWriteError) as e:
- errno = e.errno
-
- self._sock.sendall(self._outgoing.read())
-
- if errno == ssl.SSL_ERROR_WANT_READ:
- buf = self._sock.recv(self.TLS_RECORD_SIZE)
-
- if buf:
- self._incoming.write(buf)
- else:
- self._incoming.write_eof()
- if errno is None:
- return ret
-
- def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
- exc_map: ExceptionMapping = {socket.timeout: ReadTimeout, OSError: ReadError}
- with map_exceptions(exc_map):
- self._sock.settimeout(timeout)
- return typing.cast(
- bytes, self._perform_io(functools.partial(self.ssl_obj.read, max_bytes))
- )
-
- def write(self, buffer: bytes, timeout: float | None = None) -> None:
- exc_map: ExceptionMapping = {socket.timeout: WriteTimeout, OSError: WriteError}
- with map_exceptions(exc_map):
- self._sock.settimeout(timeout)
- while buffer:
- nsent = self._perform_io(functools.partial(self.ssl_obj.write, buffer))
- buffer = buffer[nsent:]
-
- def close(self) -> None:
- self._sock.close()
-
- def start_tls(
- self,
- ssl_context: ssl.SSLContext,
- server_hostname: str | None = None,
- timeout: float | None = None,
- ) -> NetworkStream:
- raise NotImplementedError()
-
- def get_extra_info(self, info: str) -> typing.Any:
- if info == "ssl_object":
- return self.ssl_obj
- if info == "client_addr":
- return self._sock.getsockname()
- if info == "server_addr":
- return self._sock.getpeername()
- if info == "socket":
- return self._sock
- if info == "is_readable":
- return is_socket_readable(self._sock)
- return None
-
-
-class SyncStream(NetworkStream):
- def __init__(self, sock: socket.socket) -> None:
- self._sock = sock
-
- def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
- exc_map: ExceptionMapping = {socket.timeout: ReadTimeout, OSError: ReadError}
- with map_exceptions(exc_map):
- self._sock.settimeout(timeout)
- return self._sock.recv(max_bytes)
-
- def write(self, buffer: bytes, timeout: float | None = None) -> None:
- if not buffer:
- return
-
- exc_map: ExceptionMapping = {socket.timeout: WriteTimeout, OSError: WriteError}
- with map_exceptions(exc_map):
- while buffer:
- self._sock.settimeout(timeout)
- n = self._sock.send(buffer)
- buffer = buffer[n:]
-
- def close(self) -> None:
- self._sock.close()
-
- def start_tls(
- self,
- ssl_context: ssl.SSLContext,
- server_hostname: str | None = None,
- timeout: float | None = None,
- ) -> NetworkStream:
- exc_map: ExceptionMapping = {
- socket.timeout: ConnectTimeout,
- OSError: ConnectError,
- }
- with map_exceptions(exc_map):
- try:
- if isinstance(self._sock, ssl.SSLSocket): # pragma: no cover
- # If the underlying socket has already been upgraded
- # to the TLS layer (i.e. is an instance of SSLSocket),
- # we need some additional smarts to support TLS-in-TLS.
- return TLSinTLSStream(
- self._sock, ssl_context, server_hostname, timeout
- )
- else:
- self._sock.settimeout(timeout)
- sock = ssl_context.wrap_socket(
- self._sock, server_hostname=server_hostname
- )
- except Exception as exc: # pragma: nocover
- self.close()
- raise exc
- return SyncStream(sock)
-
- def get_extra_info(self, info: str) -> typing.Any:
- if info == "ssl_object" and isinstance(self._sock, ssl.SSLSocket):
- return self._sock._sslobj # type: ignore
- if info == "client_addr":
- return self._sock.getsockname()
- if info == "server_addr":
- return self._sock.getpeername()
- if info == "socket":
- return self._sock
- if info == "is_readable":
- return is_socket_readable(self._sock)
- return None
-
-
-class SyncBackend(NetworkBackend):
- def connect_tcp(
- self,
- host: str,
- port: int,
- timeout: float | None = None,
- local_address: str | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> NetworkStream:
- # Note that we automatically include `TCP_NODELAY`
- # in addition to any other custom socket options.
- if socket_options is None:
- socket_options = [] # pragma: no cover
- address = (host, port)
- source_address = None if local_address is None else (local_address, 0)
- exc_map: ExceptionMapping = {
- socket.timeout: ConnectTimeout,
- OSError: ConnectError,
- }
-
- with map_exceptions(exc_map):
- sock = socket.create_connection(
- address,
- timeout,
- source_address=source_address,
- )
- for option in socket_options:
- sock.setsockopt(*option) # pragma: no cover
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- return SyncStream(sock)
-
- def connect_unix_socket(
- self,
- path: str,
- timeout: float | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> NetworkStream: # pragma: nocover
- if sys.platform == "win32":
- raise RuntimeError(
- "Attempted to connect to a UNIX socket on a Windows system."
- )
- if socket_options is None:
- socket_options = []
-
- exc_map: ExceptionMapping = {
- socket.timeout: ConnectTimeout,
- OSError: ConnectError,
- }
- with map_exceptions(exc_map):
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- for option in socket_options:
- sock.setsockopt(*option)
- sock.settimeout(timeout)
- sock.connect(path)
- return SyncStream(sock)
diff --git a/contrib/python/httpcore/httpcore/_backends/trio.py b/contrib/python/httpcore/httpcore/_backends/trio.py
deleted file mode 100644
index 6f53f5f2a02..00000000000
--- a/contrib/python/httpcore/httpcore/_backends/trio.py
+++ /dev/null
@@ -1,159 +0,0 @@
-from __future__ import annotations
-
-import ssl
-import typing
-
-import trio
-
-from .._exceptions import (
- ConnectError,
- ConnectTimeout,
- ExceptionMapping,
- ReadError,
- ReadTimeout,
- WriteError,
- WriteTimeout,
- map_exceptions,
-)
-from .base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream
-
-
-class TrioStream(AsyncNetworkStream):
- def __init__(self, stream: trio.abc.Stream) -> None:
- self._stream = stream
-
- async def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
- timeout_or_inf = float("inf") if timeout is None else timeout
- exc_map: ExceptionMapping = {
- trio.TooSlowError: ReadTimeout,
- trio.BrokenResourceError: ReadError,
- trio.ClosedResourceError: ReadError,
- }
- with map_exceptions(exc_map):
- with trio.fail_after(timeout_or_inf):
- data: bytes = await self._stream.receive_some(max_bytes=max_bytes)
- return data
-
- async def write(self, buffer: bytes, timeout: float | None = None) -> None:
- if not buffer:
- return
-
- timeout_or_inf = float("inf") if timeout is None else timeout
- exc_map: ExceptionMapping = {
- trio.TooSlowError: WriteTimeout,
- trio.BrokenResourceError: WriteError,
- trio.ClosedResourceError: WriteError,
- }
- with map_exceptions(exc_map):
- with trio.fail_after(timeout_or_inf):
- await self._stream.send_all(data=buffer)
-
- async def aclose(self) -> None:
- await self._stream.aclose()
-
- async def start_tls(
- self,
- ssl_context: ssl.SSLContext,
- server_hostname: str | None = None,
- timeout: float | None = None,
- ) -> AsyncNetworkStream:
- timeout_or_inf = float("inf") if timeout is None else timeout
- exc_map: ExceptionMapping = {
- trio.TooSlowError: ConnectTimeout,
- trio.BrokenResourceError: ConnectError,
- }
- ssl_stream = trio.SSLStream(
- self._stream,
- ssl_context=ssl_context,
- server_hostname=server_hostname,
- https_compatible=True,
- server_side=False,
- )
- with map_exceptions(exc_map):
- try:
- with trio.fail_after(timeout_or_inf):
- await ssl_stream.do_handshake()
- except Exception as exc: # pragma: nocover
- await self.aclose()
- raise exc
- return TrioStream(ssl_stream)
-
- def get_extra_info(self, info: str) -> typing.Any:
- if info == "ssl_object" and isinstance(self._stream, trio.SSLStream):
- # Type checkers cannot see `_ssl_object` attribute because trio._ssl.SSLStream uses __getattr__/__setattr__.
- # Tracked at https://github.com/python-trio/trio/issues/542
- return self._stream._ssl_object # type: ignore[attr-defined]
- if info == "client_addr":
- return self._get_socket_stream().socket.getsockname()
- if info == "server_addr":
- return self._get_socket_stream().socket.getpeername()
- if info == "socket":
- stream = self._stream
- while isinstance(stream, trio.SSLStream):
- stream = stream.transport_stream
- assert isinstance(stream, trio.SocketStream)
- return stream.socket
- if info == "is_readable":
- socket = self.get_extra_info("socket")
- return socket.is_readable()
- return None
-
- def _get_socket_stream(self) -> trio.SocketStream:
- stream = self._stream
- while isinstance(stream, trio.SSLStream):
- stream = stream.transport_stream
- assert isinstance(stream, trio.SocketStream)
- return stream
-
-
-class TrioBackend(AsyncNetworkBackend):
- async def connect_tcp(
- self,
- host: str,
- port: int,
- timeout: float | None = None,
- local_address: str | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> AsyncNetworkStream:
- # By default for TCP sockets, trio enables TCP_NODELAY.
- # https://trio.readthedocs.io/en/stable/reference-io.html#trio.SocketStream
- if socket_options is None:
- socket_options = [] # pragma: no cover
- timeout_or_inf = float("inf") if timeout is None else timeout
- exc_map: ExceptionMapping = {
- trio.TooSlowError: ConnectTimeout,
- trio.BrokenResourceError: ConnectError,
- OSError: ConnectError,
- }
- with map_exceptions(exc_map):
- with trio.fail_after(timeout_or_inf):
- stream: trio.abc.Stream = await trio.open_tcp_stream(
- host=host, port=port, local_address=local_address
- )
- for option in socket_options:
- stream.setsockopt(*option) # type: ignore[attr-defined] # pragma: no cover
- return TrioStream(stream)
-
- async def connect_unix_socket(
- self,
- path: str,
- timeout: float | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> AsyncNetworkStream: # pragma: nocover
- if socket_options is None:
- socket_options = []
- timeout_or_inf = float("inf") if timeout is None else timeout
- exc_map: ExceptionMapping = {
- trio.TooSlowError: ConnectTimeout,
- trio.BrokenResourceError: ConnectError,
- OSError: ConnectError,
- }
- with map_exceptions(exc_map):
- with trio.fail_after(timeout_or_inf):
- stream: trio.abc.Stream = await trio.open_unix_socket(path)
- for option in socket_options:
- stream.setsockopt(*option) # type: ignore[attr-defined] # pragma: no cover
- return TrioStream(stream)
-
- async def sleep(self, seconds: float) -> None:
- await trio.sleep(seconds) # pragma: nocover
diff --git a/contrib/python/httpcore/httpcore/_exceptions.py b/contrib/python/httpcore/httpcore/_exceptions.py
deleted file mode 100644
index bc28d44f55b..00000000000
--- a/contrib/python/httpcore/httpcore/_exceptions.py
+++ /dev/null
@@ -1,81 +0,0 @@
-import contextlib
-import typing
-
-ExceptionMapping = typing.Mapping[typing.Type[Exception], typing.Type[Exception]]
-
-
-def map_exceptions(map: ExceptionMapping) -> typing.Iterator[None]:
- try:
- yield
- except Exception as exc: # noqa: PIE786
- for from_exc, to_exc in map.items():
- if isinstance(exc, from_exc):
- raise to_exc(exc) from exc
- raise # pragma: nocover
-
-
-class ConnectionNotAvailable(Exception):
- pass
-
-
-class ProxyError(Exception):
- pass
-
-
-class UnsupportedProtocol(Exception):
- pass
-
-
-class ProtocolError(Exception):
- pass
-
-
-class RemoteProtocolError(ProtocolError):
- pass
-
-
-class LocalProtocolError(ProtocolError):
- pass
-
-
-# Timeout errors
-
-
-class TimeoutException(Exception):
- pass
-
-
-class PoolTimeout(TimeoutException):
- pass
-
-
-class ConnectTimeout(TimeoutException):
- pass
-
-
-class ReadTimeout(TimeoutException):
- pass
-
-
-class WriteTimeout(TimeoutException):
- pass
-
-
-# Network errors
-
-
-class NetworkError(Exception):
- pass
-
-
-class ConnectError(NetworkError):
- pass
-
-
-class ReadError(NetworkError):
- pass
-
-
-class WriteError(NetworkError):
- pass
diff --git a/contrib/python/httpcore/httpcore/_models.py b/contrib/python/httpcore/httpcore/_models.py
deleted file mode 100644
index 8a65f13347d..00000000000
--- a/contrib/python/httpcore/httpcore/_models.py
+++ /dev/null
@@ -1,516 +0,0 @@
-from __future__ import annotations
-
-import base64
-import ssl
-import typing
-import urllib.parse
-
-# Functions for typechecking...
-
-
-ByteOrStr = typing.Union[bytes, str]
-HeadersAsSequence = typing.Sequence[typing.Tuple[ByteOrStr, ByteOrStr]]
-HeadersAsMapping = typing.Mapping[ByteOrStr, ByteOrStr]
-HeaderTypes = typing.Union[HeadersAsSequence, HeadersAsMapping, None]
-
-Extensions = typing.MutableMapping[str, typing.Any]
-
-
-def enforce_bytes(value: bytes | str, *, name: str) -> bytes:
- """
- Any arguments that are ultimately represented as bytes can be specified
- either as bytes or as strings.
-
- However we enforce that any string arguments must only contain characters in
- the plain ASCII range. chr(0)...chr(127). If you need to use characters
- outside that range then be precise, and use a byte-wise argument.
- """
- if isinstance(value, str):
- try:
- return value.encode("ascii")
- except UnicodeEncodeError:
- raise TypeError(f"{name} strings may not include unicode characters.")
- elif isinstance(value, bytes):
- return value
-
- seen_type = type(value).__name__
- raise TypeError(f"{name} must be bytes or str, but got {seen_type}.")
-
-
-def enforce_url(value: URL | bytes | str, *, name: str) -> URL:
- """
- Type check for URL parameters.
- """
- if isinstance(value, (bytes, str)):
- return URL(value)
- elif isinstance(value, URL):
- return value
-
- seen_type = type(value).__name__
- raise TypeError(f"{name} must be a URL, bytes, or str, but got {seen_type}.")
-
-
-def enforce_headers(
- value: HeadersAsMapping | HeadersAsSequence | None = None, *, name: str
-) -> list[tuple[bytes, bytes]]:
- """
- Convienence function that ensure all items in request or response headers
- are either bytes or strings in the plain ASCII range.
- """
- if value is None:
- return []
- elif isinstance(value, typing.Mapping):
- return [
- (
- enforce_bytes(k, name="header name"),
- enforce_bytes(v, name="header value"),
- )
- for k, v in value.items()
- ]
- elif isinstance(value, typing.Sequence):
- return [
- (
- enforce_bytes(k, name="header name"),
- enforce_bytes(v, name="header value"),
- )
- for k, v in value
- ]
-
- seen_type = type(value).__name__
- raise TypeError(
- f"{name} must be a mapping or sequence of two-tuples, but got {seen_type}."
- )
-
-
-def enforce_stream(
- value: bytes | typing.Iterable[bytes] | typing.AsyncIterable[bytes] | None,
- *,
- name: str,
-) -> typing.Iterable[bytes] | typing.AsyncIterable[bytes]:
- if value is None:
- return ByteStream(b"")
- elif isinstance(value, bytes):
- return ByteStream(value)
- return value
-
-
-# * https://tools.ietf.org/html/rfc3986#section-3.2.3
-# * https://url.spec.whatwg.org/#url-miscellaneous
-# * https://url.spec.whatwg.org/#scheme-state
-DEFAULT_PORTS = {
- b"ftp": 21,
- b"http": 80,
- b"https": 443,
- b"ws": 80,
- b"wss": 443,
-}
-
-
-def include_request_headers(
- headers: list[tuple[bytes, bytes]],
- *,
- url: "URL",
- content: None | bytes | typing.Iterable[bytes] | typing.AsyncIterable[bytes],
-) -> list[tuple[bytes, bytes]]:
- headers_set = set(k.lower() for k, v in headers)
-
- if b"host" not in headers_set:
- default_port = DEFAULT_PORTS.get(url.scheme)
- if url.port is None or url.port == default_port:
- header_value = url.host
- else:
- header_value = b"%b:%d" % (url.host, url.port)
- headers = [(b"Host", header_value)] + headers
-
- if (
- content is not None
- and b"content-length" not in headers_set
- and b"transfer-encoding" not in headers_set
- ):
- if isinstance(content, bytes):
- content_length = str(len(content)).encode("ascii")
- headers += [(b"Content-Length", content_length)]
- else:
- headers += [(b"Transfer-Encoding", b"chunked")] # pragma: nocover
-
- return headers
-
-
-# Interfaces for byte streams...
-
-
-class ByteStream:
- """
- A container for non-streaming content, and that supports both sync and async
- stream iteration.
- """
-
- def __init__(self, content: bytes) -> None:
- self._content = content
-
- def __iter__(self) -> typing.Iterator[bytes]:
- yield self._content
-
- async def __aiter__(self) -> typing.AsyncIterator[bytes]:
- yield self._content
-
- def __repr__(self) -> str:
- return f"<{self.__class__.__name__} [{len(self._content)} bytes]>"
-
-
-class Origin:
- def __init__(self, scheme: bytes, host: bytes, port: int) -> None:
- self.scheme = scheme
- self.host = host
- self.port = port
-
- def __eq__(self, other: typing.Any) -> bool:
- return (
- isinstance(other, Origin)
- and self.scheme == other.scheme
- and self.host == other.host
- and self.port == other.port
- )
-
- def __str__(self) -> str:
- scheme = self.scheme.decode("ascii")
- host = self.host.decode("ascii")
- port = str(self.port)
- return f"{scheme}://{host}:{port}"
-
-
-class URL:
- """
- Represents the URL against which an HTTP request may be made.
-
- The URL may either be specified as a plain string, for convienence:
-
- ```python
- url = httpcore.URL("https://www.example.com/")
- ```
-
- Or be constructed with explicitily pre-parsed components:
-
- ```python
- url = httpcore.URL(scheme=b'https', host=b'www.example.com', port=None, target=b'/')
- ```
-
- Using this second more explicit style allows integrations that are using
- `httpcore` to pass through URLs that have already been parsed in order to use
- libraries such as `rfc-3986` rather than relying on the stdlib. It also ensures
- that URL parsing is treated identically at both the networking level and at any
- higher layers of abstraction.
-
- The four components are important here, as they allow the URL to be precisely
- specified in a pre-parsed format. They also allow certain types of request to
- be created that could not otherwise be expressed.
-
- For example, an HTTP request to `http://www.example.com/` forwarded via a proxy
- at `http://localhost:8080`...
-
- ```python
- # Constructs an HTTP request with a complete URL as the target:
- # GET https://www.example.com/ HTTP/1.1
- url = httpcore.URL(
- scheme=b'http',
- host=b'localhost',
- port=8080,
- target=b'https://www.example.com/'
- )
- request = httpcore.Request(
- method="GET",
- url=url
- )
- ```
-
- Another example is constructing an `OPTIONS *` request...
-
- ```python
- # Constructs an 'OPTIONS *' HTTP request:
- # OPTIONS * HTTP/1.1
- url = httpcore.URL(scheme=b'https', host=b'www.example.com', target=b'*')
- request = httpcore.Request(method="OPTIONS", url=url)
- ```
-
- This kind of request is not possible to formulate with a URL string,
- because the `/` delimiter is always used to demark the target from the
- host/port portion of the URL.
-
- For convenience, string-like arguments may be specified either as strings or
- as bytes. However, once a request is being issue over-the-wire, the URL
- components are always ultimately required to be a bytewise representation.
-
- In order to avoid any ambiguity over character encodings, when strings are used
- as arguments, they must be strictly limited to the ASCII range `chr(0)`-`chr(127)`.
- If you require a bytewise representation that is outside this range you must
- handle the character encoding directly, and pass a bytes instance.
- """
-
- def __init__(
- self,
- url: bytes | str = "",
- *,
- scheme: bytes | str = b"",
- host: bytes | str = b"",
- port: int | None = None,
- target: bytes | str = b"",
- ) -> None:
- """
- Parameters:
- url: The complete URL as a string or bytes.
- scheme: The URL scheme as a string or bytes.
- Typically either `"http"` or `"https"`.
- host: The URL host as a string or bytes. Such as `"www.example.com"`.
- port: The port to connect to. Either an integer or `None`.
- target: The target of the HTTP request. Such as `"/items?search=red"`.
- """
- if url:
- parsed = urllib.parse.urlparse(enforce_bytes(url, name="url"))
- self.scheme = parsed.scheme
- self.host = parsed.hostname or b""
- self.port = parsed.port
- self.target = (parsed.path or b"/") + (
- b"?" + parsed.query if parsed.query else b""
- )
- else:
- self.scheme = enforce_bytes(scheme, name="scheme")
- self.host = enforce_bytes(host, name="host")
- self.port = port
- self.target = enforce_bytes(target, name="target")
-
- @property
- def origin(self) -> Origin:
- default_port = {
- b"http": 80,
- b"https": 443,
- b"ws": 80,
- b"wss": 443,
- b"socks5": 1080,
- b"socks5h": 1080,
- }[self.scheme]
- return Origin(
- scheme=self.scheme, host=self.host, port=self.port or default_port
- )
-
- def __eq__(self, other: typing.Any) -> bool:
- return (
- isinstance(other, URL)
- and other.scheme == self.scheme
- and other.host == self.host
- and other.port == self.port
- and other.target == self.target
- )
-
- def __bytes__(self) -> bytes:
- if self.port is None:
- return b"%b://%b%b" % (self.scheme, self.host, self.target)
- return b"%b://%b:%d%b" % (self.scheme, self.host, self.port, self.target)
-
- def __repr__(self) -> str:
- return (
- f"{self.__class__.__name__}(scheme={self.scheme!r}, "
- f"host={self.host!r}, port={self.port!r}, target={self.target!r})"
- )
-
-
-class Request:
- """
- An HTTP request.
- """
-
- def __init__(
- self,
- method: bytes | str,
- url: URL | bytes | str,
- *,
- headers: HeaderTypes = None,
- content: bytes
- | typing.Iterable[bytes]
- | typing.AsyncIterable[bytes]
- | None = None,
- extensions: Extensions | None = None,
- ) -> None:
- """
- Parameters:
- method: The HTTP request method, either as a string or bytes.
- For example: `GET`.
- url: The request URL, either as a `URL` instance, or as a string or bytes.
- For example: `"https://www.example.com".`
- headers: The HTTP request headers.
- content: The content of the request body.
- extensions: A dictionary of optional extra information included on
- the request. Possible keys include `"timeout"`, and `"trace"`.
- """
- self.method: bytes = enforce_bytes(method, name="method")
- self.url: URL = enforce_url(url, name="url")
- self.headers: list[tuple[bytes, bytes]] = enforce_headers(
- headers, name="headers"
- )
- self.stream: typing.Iterable[bytes] | typing.AsyncIterable[bytes] = (
- enforce_stream(content, name="content")
- )
- self.extensions = {} if extensions is None else extensions
-
- if "target" in self.extensions:
- self.url = URL(
- scheme=self.url.scheme,
- host=self.url.host,
- port=self.url.port,
- target=self.extensions["target"],
- )
-
- def __repr__(self) -> str:
- return f"<{self.__class__.__name__} [{self.method!r}]>"
-
-
-class Response:
- """
- An HTTP response.
- """
-
- def __init__(
- self,
- status: int,
- *,
- headers: HeaderTypes = None,
- content: bytes
- | typing.Iterable[bytes]
- | typing.AsyncIterable[bytes]
- | None = None,
- extensions: Extensions | None = None,
- ) -> None:
- """
- Parameters:
- status: The HTTP status code of the response. For example `200`.
- headers: The HTTP response headers.
- content: The content of the response body.
- extensions: A dictionary of optional extra information included on
- the responseself.Possible keys include `"http_version"`,
- `"reason_phrase"`, and `"network_stream"`.
- """
- self.status: int = status
- self.headers: list[tuple[bytes, bytes]] = enforce_headers(
- headers, name="headers"
- )
- self.stream: typing.Iterable[bytes] | typing.AsyncIterable[bytes] = (
- enforce_stream(content, name="content")
- )
- self.extensions = {} if extensions is None else extensions
-
- self._stream_consumed = False
-
- @property
- def content(self) -> bytes:
- if not hasattr(self, "_content"):
- if isinstance(self.stream, typing.Iterable):
- raise RuntimeError(
- "Attempted to access 'response.content' on a streaming response. "
- "Call 'response.read()' first."
- )
- else:
- raise RuntimeError(
- "Attempted to access 'response.content' on a streaming response. "
- "Call 'await response.aread()' first."
- )
- return self._content
-
- def __repr__(self) -> str:
- return f"<{self.__class__.__name__} [{self.status}]>"
-
- # Sync interface...
-
- def read(self) -> bytes:
- if not isinstance(self.stream, typing.Iterable): # pragma: nocover
- raise RuntimeError(
- "Attempted to read an asynchronous response using 'response.read()'. "
- "You should use 'await response.aread()' instead."
- )
- if not hasattr(self, "_content"):
- self._content = b"".join([part for part in self.iter_stream()])
- return self._content
-
- def iter_stream(self) -> typing.Iterator[bytes]:
- if not isinstance(self.stream, typing.Iterable): # pragma: nocover
- raise RuntimeError(
- "Attempted to stream an asynchronous response using 'for ... in "
- "response.iter_stream()'. "
- "You should use 'async for ... in response.aiter_stream()' instead."
- )
- if self._stream_consumed:
- raise RuntimeError(
- "Attempted to call 'for ... in response.iter_stream()' more than once."
- )
- self._stream_consumed = True
- for chunk in self.stream:
- yield chunk
-
- def close(self) -> None:
- if not isinstance(self.stream, typing.Iterable): # pragma: nocover
- raise RuntimeError(
- "Attempted to close an asynchronous response using 'response.close()'. "
- "You should use 'await response.aclose()' instead."
- )
- if hasattr(self.stream, "close"):
- self.stream.close()
-
- # Async interface...
-
- async def aread(self) -> bytes:
- if not isinstance(self.stream, typing.AsyncIterable): # pragma: nocover
- raise RuntimeError(
- "Attempted to read an synchronous response using "
- "'await response.aread()'. "
- "You should use 'response.read()' instead."
- )
- if not hasattr(self, "_content"):
- self._content = b"".join([part async for part in self.aiter_stream()])
- return self._content
-
- async def aiter_stream(self) -> typing.AsyncIterator[bytes]:
- if not isinstance(self.stream, typing.AsyncIterable): # pragma: nocover
- raise RuntimeError(
- "Attempted to stream an synchronous response using 'async for ... in "
- "response.aiter_stream()'. "
- "You should use 'for ... in response.iter_stream()' instead."
- )
- if self._stream_consumed:
- raise RuntimeError(
- "Attempted to call 'async for ... in response.aiter_stream()' "
- "more than once."
- )
- self._stream_consumed = True
- async for chunk in self.stream:
- yield chunk
-
- async def aclose(self) -> None:
- if not isinstance(self.stream, typing.AsyncIterable): # pragma: nocover
- raise RuntimeError(
- "Attempted to close a synchronous response using "
- "'await response.aclose()'. "
- "You should use 'response.close()' instead."
- )
- if hasattr(self.stream, "aclose"):
- await self.stream.aclose()
-
-
-class Proxy:
- def __init__(
- self,
- url: URL | bytes | str,
- auth: tuple[bytes | str, bytes | str] | None = None,
- headers: HeadersAsMapping | HeadersAsSequence | None = None,
- ssl_context: ssl.SSLContext | None = None,
- ):
- self.url = enforce_url(url, name="url")
- self.headers = enforce_headers(headers, name="headers")
- self.ssl_context = ssl_context
-
- if auth is not None:
- username = enforce_bytes(auth[0], name="auth")
- password = enforce_bytes(auth[1], name="auth")
- userpass = username + b":" + password
- authorization = b"Basic " + base64.b64encode(userpass)
- self.auth: tuple[bytes, bytes] | None = (username, password)
- self.headers = [(b"Proxy-Authorization", authorization)] + self.headers
- else:
- self.auth = None
diff --git a/contrib/python/httpcore/httpcore/_ssl.py b/contrib/python/httpcore/httpcore/_ssl.py
deleted file mode 100644
index c99c5a67945..00000000000
--- a/contrib/python/httpcore/httpcore/_ssl.py
+++ /dev/null
@@ -1,9 +0,0 @@
-import ssl
-
-import certifi
-
-
-def default_ssl_context() -> ssl.SSLContext:
- context = ssl.create_default_context()
- context.load_verify_locations(certifi.where())
- return context
diff --git a/contrib/python/httpcore/httpcore/_sync/__init__.py b/contrib/python/httpcore/httpcore/_sync/__init__.py
deleted file mode 100644
index b476d76d9a7..00000000000
--- a/contrib/python/httpcore/httpcore/_sync/__init__.py
+++ /dev/null
@@ -1,39 +0,0 @@
-from .connection import HTTPConnection
-from .connection_pool import ConnectionPool
-from .http11 import HTTP11Connection
-from .http_proxy import HTTPProxy
-from .interfaces import ConnectionInterface
-
-try:
- from .http2 import HTTP2Connection
-except ImportError: # pragma: nocover
-
- class HTTP2Connection: # type: ignore
- def __init__(self, *args, **kwargs) -> None: # type: ignore
- raise RuntimeError(
- "Attempted to use http2 support, but the `h2` package is not "
- "installed. Use 'pip install httpcore[http2]'."
- )
-
-
-try:
- from .socks_proxy import SOCKSProxy
-except ImportError: # pragma: nocover
-
- class SOCKSProxy: # type: ignore
- def __init__(self, *args, **kwargs) -> None: # type: ignore
- raise RuntimeError(
- "Attempted to use SOCKS support, but the `socksio` package is not "
- "installed. Use 'pip install httpcore[socks]'."
- )
-
-
-__all__ = [
- "HTTPConnection",
- "ConnectionPool",
- "HTTPProxy",
- "HTTP11Connection",
- "HTTP2Connection",
- "ConnectionInterface",
- "SOCKSProxy",
-]
diff --git a/contrib/python/httpcore/httpcore/_sync/connection.py b/contrib/python/httpcore/httpcore/_sync/connection.py
deleted file mode 100644
index 363f8be819d..00000000000
--- a/contrib/python/httpcore/httpcore/_sync/connection.py
+++ /dev/null
@@ -1,222 +0,0 @@
-from __future__ import annotations
-
-import itertools
-import logging
-import ssl
-import types
-import typing
-
-from .._backends.sync import SyncBackend
-from .._backends.base import SOCKET_OPTION, NetworkBackend, NetworkStream
-from .._exceptions import ConnectError, ConnectTimeout
-from .._models import Origin, Request, Response
-from .._ssl import default_ssl_context
-from .._synchronization import Lock
-from .._trace import Trace
-from .http11 import HTTP11Connection
-from .interfaces import ConnectionInterface
-
-RETRIES_BACKOFF_FACTOR = 0.5 # 0s, 0.5s, 1s, 2s, 4s, etc.
-
-
-logger = logging.getLogger("httpcore.connection")
-
-
-def exponential_backoff(factor: float) -> typing.Iterator[float]:
- """
- Generate a geometric sequence that has a ratio of 2 and starts with 0.
-
- For example:
- - `factor = 2`: `0, 2, 4, 8, 16, 32, 64, ...`
- - `factor = 3`: `0, 3, 6, 12, 24, 48, 96, ...`
- """
- yield 0
- for n in itertools.count():
- yield factor * 2**n
-
-
-class HTTPConnection(ConnectionInterface):
- def __init__(
- self,
- origin: Origin,
- ssl_context: ssl.SSLContext | None = None,
- keepalive_expiry: float | None = None,
- http1: bool = True,
- http2: bool = False,
- retries: int = 0,
- local_address: str | None = None,
- uds: str | None = None,
- network_backend: NetworkBackend | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> None:
- self._origin = origin
- self._ssl_context = ssl_context
- self._keepalive_expiry = keepalive_expiry
- self._http1 = http1
- self._http2 = http2
- self._retries = retries
- self._local_address = local_address
- self._uds = uds
-
- self._network_backend: NetworkBackend = (
- SyncBackend() if network_backend is None else network_backend
- )
- self._connection: ConnectionInterface | None = None
- self._connect_failed: bool = False
- self._request_lock = Lock()
- self._socket_options = socket_options
-
- def handle_request(self, request: Request) -> Response:
- if not self.can_handle_request(request.url.origin):
- raise RuntimeError(
- f"Attempted to send request to {request.url.origin} on connection to {self._origin}"
- )
-
- try:
- with self._request_lock:
- if self._connection is None:
- stream = self._connect(request)
-
- ssl_object = stream.get_extra_info("ssl_object")
- http2_negotiated = (
- ssl_object is not None
- and ssl_object.selected_alpn_protocol() == "h2"
- )
- if http2_negotiated or (self._http2 and not self._http1):
- from .http2 import HTTP2Connection
-
- self._connection = HTTP2Connection(
- origin=self._origin,
- stream=stream,
- keepalive_expiry=self._keepalive_expiry,
- )
- else:
- self._connection = HTTP11Connection(
- origin=self._origin,
- stream=stream,
- keepalive_expiry=self._keepalive_expiry,
- )
- except BaseException as exc:
- self._connect_failed = True
- raise exc
-
- return self._connection.handle_request(request)
-
- def _connect(self, request: Request) -> NetworkStream:
- timeouts = request.extensions.get("timeout", {})
- sni_hostname = request.extensions.get("sni_hostname", None)
- timeout = timeouts.get("connect", None)
-
- retries_left = self._retries
- delays = exponential_backoff(factor=RETRIES_BACKOFF_FACTOR)
-
- while True:
- try:
- if self._uds is None:
- kwargs = {
- "host": self._origin.host.decode("ascii"),
- "port": self._origin.port,
- "local_address": self._local_address,
- "timeout": timeout,
- "socket_options": self._socket_options,
- }
- with Trace("connect_tcp", logger, request, kwargs) as trace:
- stream = self._network_backend.connect_tcp(**kwargs)
- trace.return_value = stream
- else:
- kwargs = {
- "path": self._uds,
- "timeout": timeout,
- "socket_options": self._socket_options,
- }
- with Trace(
- "connect_unix_socket", logger, request, kwargs
- ) as trace:
- stream = self._network_backend.connect_unix_socket(
- **kwargs
- )
- trace.return_value = stream
-
- if self._origin.scheme in (b"https", b"wss"):
- ssl_context = (
- default_ssl_context()
- if self._ssl_context is None
- else self._ssl_context
- )
- alpn_protocols = ["http/1.1", "h2"] if self._http2 else ["http/1.1"]
- ssl_context.set_alpn_protocols(alpn_protocols)
-
- kwargs = {
- "ssl_context": ssl_context,
- "server_hostname": sni_hostname
- or self._origin.host.decode("ascii"),
- "timeout": timeout,
- }
- with Trace("start_tls", logger, request, kwargs) as trace:
- stream = stream.start_tls(**kwargs)
- trace.return_value = stream
- return stream
- except (ConnectError, ConnectTimeout):
- if retries_left <= 0:
- raise
- retries_left -= 1
- delay = next(delays)
- with Trace("retry", logger, request, kwargs) as trace:
- self._network_backend.sleep(delay)
-
- def can_handle_request(self, origin: Origin) -> bool:
- return origin == self._origin
-
- def close(self) -> None:
- if self._connection is not None:
- with Trace("close", logger, None, {}):
- self._connection.close()
-
- def is_available(self) -> bool:
- if self._connection is None:
- # If HTTP/2 support is enabled, and the resulting connection could
- # end up as HTTP/2 then we should indicate the connection as being
- # available to service multiple requests.
- return (
- self._http2
- and (self._origin.scheme == b"https" or not self._http1)
- and not self._connect_failed
- )
- return self._connection.is_available()
-
- def has_expired(self) -> bool:
- if self._connection is None:
- return self._connect_failed
- return self._connection.has_expired()
-
- def is_idle(self) -> bool:
- if self._connection is None:
- return self._connect_failed
- return self._connection.is_idle()
-
- def is_closed(self) -> bool:
- if self._connection is None:
- return self._connect_failed
- return self._connection.is_closed()
-
- def info(self) -> str:
- if self._connection is None:
- return "CONNECTION FAILED" if self._connect_failed else "CONNECTING"
- return self._connection.info()
-
- def __repr__(self) -> str:
- return f"<{self.__class__.__name__} [{self.info()}]>"
-
- # These context managers are not used in the standard flow, but are
- # useful for testing or working with connection instances directly.
-
- def __enter__(self) -> HTTPConnection:
- return self
-
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- self.close()
diff --git a/contrib/python/httpcore/httpcore/_sync/connection_pool.py b/contrib/python/httpcore/httpcore/_sync/connection_pool.py
deleted file mode 100644
index 9ccfa53e597..00000000000
--- a/contrib/python/httpcore/httpcore/_sync/connection_pool.py
+++ /dev/null
@@ -1,420 +0,0 @@
-from __future__ import annotations
-
-import ssl
-import sys
-import types
-import typing
-
-from .._backends.sync import SyncBackend
-from .._backends.base import SOCKET_OPTION, NetworkBackend
-from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
-from .._models import Origin, Proxy, Request, Response
-from .._synchronization import Event, ShieldCancellation, ThreadLock
-from .connection import HTTPConnection
-from .interfaces import ConnectionInterface, RequestInterface
-
-
-class PoolRequest:
- def __init__(self, request: Request) -> None:
- self.request = request
- self.connection: ConnectionInterface | None = None
- self._connection_acquired = Event()
-
- def assign_to_connection(self, connection: ConnectionInterface | None) -> None:
- self.connection = connection
- self._connection_acquired.set()
-
- def clear_connection(self) -> None:
- self.connection = None
- self._connection_acquired = Event()
-
- def wait_for_connection(
- self, timeout: float | None = None
- ) -> ConnectionInterface:
- if self.connection is None:
- self._connection_acquired.wait(timeout=timeout)
- assert self.connection is not None
- return self.connection
-
- def is_queued(self) -> bool:
- return self.connection is None
-
-
-class ConnectionPool(RequestInterface):
- """
- A connection pool for making HTTP requests.
- """
-
- def __init__(
- self,
- ssl_context: ssl.SSLContext | None = None,
- proxy: Proxy | None = None,
- max_connections: int | None = 10,
- max_keepalive_connections: int | None = None,
- keepalive_expiry: float | None = None,
- http1: bool = True,
- http2: bool = False,
- retries: int = 0,
- local_address: str | None = None,
- uds: str | None = None,
- network_backend: NetworkBackend | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> None:
- """
- A connection pool for making HTTP requests.
-
- Parameters:
- ssl_context: An SSL context to use for verifying connections.
- If not specified, the default `httpcore.default_ssl_context()`
- will be used.
- max_connections: The maximum number of concurrent HTTP connections that
- the pool should allow. Any attempt to send a request on a pool that
- would exceed this amount will block until a connection is available.
- max_keepalive_connections: The maximum number of idle HTTP connections
- that will be maintained in the pool.
- keepalive_expiry: The duration in seconds that an idle HTTP connection
- may be maintained for before being expired from the pool.
- http1: A boolean indicating if HTTP/1.1 requests should be supported
- by the connection pool. Defaults to True.
- http2: A boolean indicating if HTTP/2 requests should be supported by
- the connection pool. Defaults to False.
- retries: The maximum number of retries when trying to establish a
- connection.
- local_address: Local address to connect from. Can also be used to connect
- using a particular address family. Using `local_address="0.0.0.0"`
- will connect using an `AF_INET` address (IPv4), while using
- `local_address="::"` will connect using an `AF_INET6` address (IPv6).
- uds: Path to a Unix Domain Socket to use instead of TCP sockets.
- network_backend: A backend instance to use for handling network I/O.
- socket_options: Socket options that have to be included
- in the TCP socket when the connection was established.
- """
- self._ssl_context = ssl_context
- self._proxy = proxy
- self._max_connections = (
- sys.maxsize if max_connections is None else max_connections
- )
- self._max_keepalive_connections = (
- sys.maxsize
- if max_keepalive_connections is None
- else max_keepalive_connections
- )
- self._max_keepalive_connections = min(
- self._max_connections, self._max_keepalive_connections
- )
-
- self._keepalive_expiry = keepalive_expiry
- self._http1 = http1
- self._http2 = http2
- self._retries = retries
- self._local_address = local_address
- self._uds = uds
-
- self._network_backend = (
- SyncBackend() if network_backend is None else network_backend
- )
- self._socket_options = socket_options
-
- # The mutable state on a connection pool is the queue of incoming requests,
- # and the set of connections that are servicing those requests.
- self._connections: list[ConnectionInterface] = []
- self._requests: list[PoolRequest] = []
-
- # We only mutate the state of the connection pool within an 'optional_thread_lock'
- # context. This holds a threading lock unless we're running in async mode,
- # in which case it is a no-op.
- self._optional_thread_lock = ThreadLock()
-
- def create_connection(self, origin: Origin) -> ConnectionInterface:
- if self._proxy is not None:
- if self._proxy.url.scheme in (b"socks5", b"socks5h"):
- from .socks_proxy import Socks5Connection
-
- return Socks5Connection(
- proxy_origin=self._proxy.url.origin,
- proxy_auth=self._proxy.auth,
- remote_origin=origin,
- ssl_context=self._ssl_context,
- keepalive_expiry=self._keepalive_expiry,
- http1=self._http1,
- http2=self._http2,
- network_backend=self._network_backend,
- )
- elif origin.scheme == b"http":
- from .http_proxy import ForwardHTTPConnection
-
- return ForwardHTTPConnection(
- proxy_origin=self._proxy.url.origin,
- proxy_headers=self._proxy.headers,
- proxy_ssl_context=self._proxy.ssl_context,
- remote_origin=origin,
- keepalive_expiry=self._keepalive_expiry,
- network_backend=self._network_backend,
- )
- from .http_proxy import TunnelHTTPConnection
-
- return TunnelHTTPConnection(
- proxy_origin=self._proxy.url.origin,
- proxy_headers=self._proxy.headers,
- proxy_ssl_context=self._proxy.ssl_context,
- remote_origin=origin,
- ssl_context=self._ssl_context,
- keepalive_expiry=self._keepalive_expiry,
- http1=self._http1,
- http2=self._http2,
- network_backend=self._network_backend,
- )
-
- return HTTPConnection(
- origin=origin,
- ssl_context=self._ssl_context,
- keepalive_expiry=self._keepalive_expiry,
- http1=self._http1,
- http2=self._http2,
- retries=self._retries,
- local_address=self._local_address,
- uds=self._uds,
- network_backend=self._network_backend,
- socket_options=self._socket_options,
- )
-
- @property
- def connections(self) -> list[ConnectionInterface]:
- """
- Return a list of the connections currently in the pool.
-
- For example:
-
- ```python
- >>> pool.connections
- [
- <HTTPConnection ['https://example.com:443', HTTP/1.1, ACTIVE, Request Count: 6]>,
- <HTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 9]> ,
- <HTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>,
- ]
- ```
- """
- return list(self._connections)
-
- def handle_request(self, request: Request) -> Response:
- """
- Send an HTTP request, and return an HTTP response.
-
- This is the core implementation that is called into by `.request()` or `.stream()`.
- """
- scheme = request.url.scheme.decode()
- if scheme == "":
- raise UnsupportedProtocol(
- "Request URL is missing an 'http://' or 'https://' protocol."
- )
- if scheme not in ("http", "https", "ws", "wss"):
- raise UnsupportedProtocol(
- f"Request URL has an unsupported protocol '{scheme}://'."
- )
-
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("pool", None)
-
- with self._optional_thread_lock:
- # Add the incoming request to our request queue.
- pool_request = PoolRequest(request)
- self._requests.append(pool_request)
-
- try:
- while True:
- with self._optional_thread_lock:
- # Assign incoming requests to available connections,
- # closing or creating new connections as required.
- closing = self._assign_requests_to_connections()
- self._close_connections(closing)
-
- # Wait until this request has an assigned connection.
- connection = pool_request.wait_for_connection(timeout=timeout)
-
- try:
- # Send the request on the assigned connection.
- response = connection.handle_request(
- pool_request.request
- )
- except ConnectionNotAvailable:
- # In some cases a connection may initially be available to
- # handle a request, but then become unavailable.
- #
- # In this case we clear the connection and try again.
- pool_request.clear_connection()
- else:
- break # pragma: nocover
-
- except BaseException as exc:
- with self._optional_thread_lock:
- # For any exception or cancellation we remove the request from
- # the queue, and then re-assign requests to connections.
- self._requests.remove(pool_request)
- closing = self._assign_requests_to_connections()
-
- self._close_connections(closing)
- raise exc from None
-
- # Return the response. Note that in this case we still have to manage
- # the point at which the response is closed.
- assert isinstance(response.stream, typing.Iterable)
- return Response(
- status=response.status,
- headers=response.headers,
- content=PoolByteStream(
- stream=response.stream, pool_request=pool_request, pool=self
- ),
- extensions=response.extensions,
- )
-
- def _assign_requests_to_connections(self) -> list[ConnectionInterface]:
- """
- Manage the state of the connection pool, assigning incoming
- requests to connections as available.
-
- Called whenever a new request is added or removed from the pool.
-
- Any closing connections are returned, allowing the I/O for closing
- those connections to be handled seperately.
- """
- closing_connections = []
-
- # First we handle cleaning up any connections that are closed,
- # have expired their keep-alive, or surplus idle connections.
- for connection in list(self._connections):
- if connection.is_closed():
- # log: "removing closed connection"
- self._connections.remove(connection)
- elif connection.has_expired():
- # log: "closing expired connection"
- self._connections.remove(connection)
- closing_connections.append(connection)
- elif (
- connection.is_idle()
- and len([connection.is_idle() for connection in self._connections])
- > self._max_keepalive_connections
- ):
- # log: "closing idle connection"
- self._connections.remove(connection)
- closing_connections.append(connection)
-
- # Assign queued requests to connections.
- queued_requests = [request for request in self._requests if request.is_queued()]
- for pool_request in queued_requests:
- origin = pool_request.request.url.origin
- available_connections = [
- connection
- for connection in self._connections
- if connection.can_handle_request(origin) and connection.is_available()
- ]
- idle_connections = [
- connection for connection in self._connections if connection.is_idle()
- ]
-
- # There are three cases for how we may be able to handle the request:
- #
- # 1. There is an existing connection that can handle the request.
- # 2. We can create a new connection to handle the request.
- # 3. We can close an idle connection and then create a new connection
- # to handle the request.
- if available_connections:
- # log: "reusing existing connection"
- connection = available_connections[0]
- pool_request.assign_to_connection(connection)
- elif len(self._connections) < self._max_connections:
- # log: "creating new connection"
- connection = self.create_connection(origin)
- self._connections.append(connection)
- pool_request.assign_to_connection(connection)
- elif idle_connections:
- # log: "closing idle connection"
- connection = idle_connections[0]
- self._connections.remove(connection)
- closing_connections.append(connection)
- # log: "creating new connection"
- connection = self.create_connection(origin)
- self._connections.append(connection)
- pool_request.assign_to_connection(connection)
-
- return closing_connections
-
- def _close_connections(self, closing: list[ConnectionInterface]) -> None:
- # Close connections which have been removed from the pool.
- with ShieldCancellation():
- for connection in closing:
- connection.close()
-
- def close(self) -> None:
- # Explicitly close the connection pool.
- # Clears all existing requests and connections.
- with self._optional_thread_lock:
- closing_connections = list(self._connections)
- self._connections = []
- self._close_connections(closing_connections)
-
- def __enter__(self) -> ConnectionPool:
- return self
-
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- self.close()
-
- def __repr__(self) -> str:
- class_name = self.__class__.__name__
- with self._optional_thread_lock:
- request_is_queued = [request.is_queued() for request in self._requests]
- connection_is_idle = [
- connection.is_idle() for connection in self._connections
- ]
-
- num_active_requests = request_is_queued.count(False)
- num_queued_requests = request_is_queued.count(True)
- num_active_connections = connection_is_idle.count(False)
- num_idle_connections = connection_is_idle.count(True)
-
- requests_info = (
- f"Requests: {num_active_requests} active, {num_queued_requests} queued"
- )
- connection_info = (
- f"Connections: {num_active_connections} active, {num_idle_connections} idle"
- )
-
- return f"<{class_name} [{requests_info} | {connection_info}]>"
-
-
-class PoolByteStream:
- def __init__(
- self,
- stream: typing.Iterable[bytes],
- pool_request: PoolRequest,
- pool: ConnectionPool,
- ) -> None:
- self._stream = stream
- self._pool_request = pool_request
- self._pool = pool
- self._closed = False
-
- def __iter__(self) -> typing.Iterator[bytes]:
- try:
- for part in self._stream:
- yield part
- except BaseException as exc:
- self.close()
- raise exc from None
-
- def close(self) -> None:
- if not self._closed:
- self._closed = True
- with ShieldCancellation():
- if hasattr(self._stream, "close"):
- self._stream.close()
-
- with self._pool._optional_thread_lock:
- self._pool._requests.remove(self._pool_request)
- closing = self._pool._assign_requests_to_connections()
-
- self._pool._close_connections(closing)
diff --git a/contrib/python/httpcore/httpcore/_sync/http11.py b/contrib/python/httpcore/httpcore/_sync/http11.py
deleted file mode 100644
index ebd3a97480c..00000000000
--- a/contrib/python/httpcore/httpcore/_sync/http11.py
+++ /dev/null
@@ -1,379 +0,0 @@
-from __future__ import annotations
-
-import enum
-import logging
-import ssl
-import time
-import types
-import typing
-
-import h11
-
-from .._backends.base import NetworkStream
-from .._exceptions import (
- ConnectionNotAvailable,
- LocalProtocolError,
- RemoteProtocolError,
- WriteError,
- map_exceptions,
-)
-from .._models import Origin, Request, Response
-from .._synchronization import Lock, ShieldCancellation
-from .._trace import Trace
-from .interfaces import ConnectionInterface
-
-logger = logging.getLogger("httpcore.http11")
-
-
-# A subset of `h11.Event` types supported by `_send_event`
-H11SendEvent = typing.Union[
- h11.Request,
- h11.Data,
- h11.EndOfMessage,
-]
-
-
-class HTTPConnectionState(enum.IntEnum):
- NEW = 0
- ACTIVE = 1
- IDLE = 2
- CLOSED = 3
-
-
-class HTTP11Connection(ConnectionInterface):
- READ_NUM_BYTES = 64 * 1024
- MAX_INCOMPLETE_EVENT_SIZE = 100 * 1024
-
- def __init__(
- self,
- origin: Origin,
- stream: NetworkStream,
- keepalive_expiry: float | None = None,
- ) -> None:
- self._origin = origin
- self._network_stream = stream
- self._keepalive_expiry: float | None = keepalive_expiry
- self._expire_at: float | None = None
- self._state = HTTPConnectionState.NEW
- self._state_lock = Lock()
- self._request_count = 0
- self._h11_state = h11.Connection(
- our_role=h11.CLIENT,
- max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
- )
-
- def handle_request(self, request: Request) -> Response:
- if not self.can_handle_request(request.url.origin):
- raise RuntimeError(
- f"Attempted to send request to {request.url.origin} on connection "
- f"to {self._origin}"
- )
-
- with self._state_lock:
- if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
- self._request_count += 1
- self._state = HTTPConnectionState.ACTIVE
- self._expire_at = None
- else:
- raise ConnectionNotAvailable()
-
- try:
- kwargs = {"request": request}
- try:
- with Trace(
- "send_request_headers", logger, request, kwargs
- ) as trace:
- self._send_request_headers(**kwargs)
- with Trace("send_request_body", logger, request, kwargs) as trace:
- self._send_request_body(**kwargs)
- except WriteError:
- # If we get a write error while we're writing the request,
- # then we supress this error and move on to attempting to
- # read the response. Servers can sometimes close the request
- # pre-emptively and then respond with a well formed HTTP
- # error response.
- pass
-
- with Trace(
- "receive_response_headers", logger, request, kwargs
- ) as trace:
- (
- http_version,
- status,
- reason_phrase,
- headers,
- trailing_data,
- ) = self._receive_response_headers(**kwargs)
- trace.return_value = (
- http_version,
- status,
- reason_phrase,
- headers,
- )
-
- network_stream = self._network_stream
-
- # CONNECT or Upgrade request
- if (status == 101) or (
- (request.method == b"CONNECT") and (200 <= status < 300)
- ):
- network_stream = HTTP11UpgradeStream(network_stream, trailing_data)
-
- return Response(
- status=status,
- headers=headers,
- content=HTTP11ConnectionByteStream(self, request),
- extensions={
- "http_version": http_version,
- "reason_phrase": reason_phrase,
- "network_stream": network_stream,
- },
- )
- except BaseException as exc:
- with ShieldCancellation():
- with Trace("response_closed", logger, request) as trace:
- self._response_closed()
- raise exc
-
- # Sending the request...
-
- def _send_request_headers(self, request: Request) -> None:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("write", None)
-
- with map_exceptions({h11.LocalProtocolError: LocalProtocolError}):
- event = h11.Request(
- method=request.method,
- target=request.url.target,
- headers=request.headers,
- )
- self._send_event(event, timeout=timeout)
-
- def _send_request_body(self, request: Request) -> None:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("write", None)
-
- assert isinstance(request.stream, typing.Iterable)
- for chunk in request.stream:
- event = h11.Data(data=chunk)
- self._send_event(event, timeout=timeout)
-
- self._send_event(h11.EndOfMessage(), timeout=timeout)
-
- def _send_event(self, event: h11.Event, timeout: float | None = None) -> None:
- bytes_to_send = self._h11_state.send(event)
- if bytes_to_send is not None:
- self._network_stream.write(bytes_to_send, timeout=timeout)
-
- # Receiving the response...
-
- def _receive_response_headers(
- self, request: Request
- ) -> tuple[bytes, int, bytes, list[tuple[bytes, bytes]], bytes]:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("read", None)
-
- while True:
- event = self._receive_event(timeout=timeout)
- if isinstance(event, h11.Response):
- break
- if (
- isinstance(event, h11.InformationalResponse)
- and event.status_code == 101
- ):
- break
-
- http_version = b"HTTP/" + event.http_version
-
- # h11 version 0.11+ supports a `raw_items` interface to get the
- # raw header casing, rather than the enforced lowercase headers.
- headers = event.headers.raw_items()
-
- trailing_data, _ = self._h11_state.trailing_data
-
- return http_version, event.status_code, event.reason, headers, trailing_data
-
- def _receive_response_body(
- self, request: Request
- ) -> typing.Iterator[bytes]:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("read", None)
-
- while True:
- event = self._receive_event(timeout=timeout)
- if isinstance(event, h11.Data):
- yield bytes(event.data)
- elif isinstance(event, (h11.EndOfMessage, h11.PAUSED)):
- break
-
- def _receive_event(
- self, timeout: float | None = None
- ) -> h11.Event | type[h11.PAUSED]:
- while True:
- with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
- event = self._h11_state.next_event()
-
- if event is h11.NEED_DATA:
- data = self._network_stream.read(
- self.READ_NUM_BYTES, timeout=timeout
- )
-
- # If we feed this case through h11 we'll raise an exception like:
- #
- # httpcore.RemoteProtocolError: can't handle event type
- # ConnectionClosed when role=SERVER and state=SEND_RESPONSE
- #
- # Which is accurate, but not very informative from an end-user
- # perspective. Instead we handle this case distinctly and treat
- # it as a ConnectError.
- if data == b"" and self._h11_state.their_state == h11.SEND_RESPONSE:
- msg = "Server disconnected without sending a response."
- raise RemoteProtocolError(msg)
-
- self._h11_state.receive_data(data)
- else:
- # mypy fails to narrow the type in the above if statement above
- return event # type: ignore[return-value]
-
- def _response_closed(self) -> None:
- with self._state_lock:
- if (
- self._h11_state.our_state is h11.DONE
- and self._h11_state.their_state is h11.DONE
- ):
- self._state = HTTPConnectionState.IDLE
- self._h11_state.start_next_cycle()
- if self._keepalive_expiry is not None:
- now = time.monotonic()
- self._expire_at = now + self._keepalive_expiry
- else:
- self.close()
-
- # Once the connection is no longer required...
-
- def close(self) -> None:
- # Note that this method unilaterally closes the connection, and does
- # not have any kind of locking in place around it.
- self._state = HTTPConnectionState.CLOSED
- self._network_stream.close()
-
- # The ConnectionInterface methods provide information about the state of
- # the connection, allowing for a connection pooling implementation to
- # determine when to reuse and when to close the connection...
-
- def can_handle_request(self, origin: Origin) -> bool:
- return origin == self._origin
-
- def is_available(self) -> bool:
- # Note that HTTP/1.1 connections in the "NEW" state are not treated as
- # being "available". The control flow which created the connection will
- # be able to send an outgoing request, but the connection will not be
- # acquired from the connection pool for any other request.
- return self._state == HTTPConnectionState.IDLE
-
- def has_expired(self) -> bool:
- now = time.monotonic()
- keepalive_expired = self._expire_at is not None and now > self._expire_at
-
- # If the HTTP connection is idle but the socket is readable, then the
- # only valid state is that the socket is about to return b"", indicating
- # a server-initiated disconnect.
- server_disconnected = (
- self._state == HTTPConnectionState.IDLE
- and self._network_stream.get_extra_info("is_readable")
- )
-
- return keepalive_expired or server_disconnected
-
- def is_idle(self) -> bool:
- return self._state == HTTPConnectionState.IDLE
-
- def is_closed(self) -> bool:
- return self._state == HTTPConnectionState.CLOSED
-
- def info(self) -> str:
- origin = str(self._origin)
- return (
- f"{origin!r}, HTTP/1.1, {self._state.name}, "
- f"Request Count: {self._request_count}"
- )
-
- def __repr__(self) -> str:
- class_name = self.__class__.__name__
- origin = str(self._origin)
- return (
- f"<{class_name} [{origin!r}, {self._state.name}, "
- f"Request Count: {self._request_count}]>"
- )
-
- # These context managers are not used in the standard flow, but are
- # useful for testing or working with connection instances directly.
-
- def __enter__(self) -> HTTP11Connection:
- return self
-
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- self.close()
-
-
-class HTTP11ConnectionByteStream:
- def __init__(self, connection: HTTP11Connection, request: Request) -> None:
- self._connection = connection
- self._request = request
- self._closed = False
-
- def __iter__(self) -> typing.Iterator[bytes]:
- kwargs = {"request": self._request}
- try:
- with Trace("receive_response_body", logger, self._request, kwargs):
- for chunk in self._connection._receive_response_body(**kwargs):
- yield chunk
- except BaseException as exc:
- # If we get an exception while streaming the response,
- # we want to close the response (and possibly the connection)
- # before raising that exception.
- with ShieldCancellation():
- self.close()
- raise exc
-
- def close(self) -> None:
- if not self._closed:
- self._closed = True
- with Trace("response_closed", logger, self._request):
- self._connection._response_closed()
-
-
-class HTTP11UpgradeStream(NetworkStream):
- def __init__(self, stream: NetworkStream, leading_data: bytes) -> None:
- self._stream = stream
- self._leading_data = leading_data
-
- def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
- if self._leading_data:
- buffer = self._leading_data[:max_bytes]
- self._leading_data = self._leading_data[max_bytes:]
- return buffer
- else:
- return self._stream.read(max_bytes, timeout)
-
- def write(self, buffer: bytes, timeout: float | None = None) -> None:
- self._stream.write(buffer, timeout)
-
- def close(self) -> None:
- self._stream.close()
-
- def start_tls(
- self,
- ssl_context: ssl.SSLContext,
- server_hostname: str | None = None,
- timeout: float | None = None,
- ) -> NetworkStream:
- return self._stream.start_tls(ssl_context, server_hostname, timeout)
-
- def get_extra_info(self, info: str) -> typing.Any:
- return self._stream.get_extra_info(info)
diff --git a/contrib/python/httpcore/httpcore/_sync/http2.py b/contrib/python/httpcore/httpcore/_sync/http2.py
deleted file mode 100644
index ca4dd724325..00000000000
--- a/contrib/python/httpcore/httpcore/_sync/http2.py
+++ /dev/null
@@ -1,583 +0,0 @@
-from __future__ import annotations
-
-import enum
-import logging
-import time
-import types
-import typing
-
-import h2.config
-import h2.connection
-import h2.events
-import h2.exceptions
-import h2.settings
-
-from .._backends.base import NetworkStream
-from .._exceptions import (
- ConnectionNotAvailable,
- LocalProtocolError,
- RemoteProtocolError,
-)
-from .._models import Origin, Request, Response
-from .._synchronization import Lock, Semaphore, ShieldCancellation
-from .._trace import Trace
-from .interfaces import ConnectionInterface
-
-logger = logging.getLogger("httpcore.http2")
-
-
-def has_body_headers(request: Request) -> bool:
- return any(
- k.lower() == b"content-length" or k.lower() == b"transfer-encoding"
- for k, v in request.headers
- )
-
-
-class HTTPConnectionState(enum.IntEnum):
- ACTIVE = 1
- IDLE = 2
- CLOSED = 3
-
-
-class HTTP2Connection(ConnectionInterface):
- READ_NUM_BYTES = 64 * 1024
- CONFIG = h2.config.H2Configuration(validate_inbound_headers=False)
-
- def __init__(
- self,
- origin: Origin,
- stream: NetworkStream,
- keepalive_expiry: float | None = None,
- ):
- self._origin = origin
- self._network_stream = stream
- self._keepalive_expiry: float | None = keepalive_expiry
- self._h2_state = h2.connection.H2Connection(config=self.CONFIG)
- self._state = HTTPConnectionState.IDLE
- self._expire_at: float | None = None
- self._request_count = 0
- self._init_lock = Lock()
- self._state_lock = Lock()
- self._read_lock = Lock()
- self._write_lock = Lock()
- self._sent_connection_init = False
- self._used_all_stream_ids = False
- self._connection_error = False
-
- # Mapping from stream ID to response stream events.
- self._events: dict[
- int,
- h2.events.ResponseReceived
- | h2.events.DataReceived
- | h2.events.StreamEnded
- | h2.events.StreamReset,
- ] = {}
-
- # Connection terminated events are stored as state since
- # we need to handle them for all streams.
- self._connection_terminated: h2.events.ConnectionTerminated | None = None
-
- self._read_exception: Exception | None = None
- self._write_exception: Exception | None = None
-
- def handle_request(self, request: Request) -> Response:
- if not self.can_handle_request(request.url.origin):
- # This cannot occur in normal operation, since the connection pool
- # will only send requests on connections that handle them.
- # It's in place simply for resilience as a guard against incorrect
- # usage, for anyone working directly with httpcore connections.
- raise RuntimeError(
- f"Attempted to send request to {request.url.origin} on connection "
- f"to {self._origin}"
- )
-
- with self._state_lock:
- if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE):
- self._request_count += 1
- self._expire_at = None
- self._state = HTTPConnectionState.ACTIVE
- else:
- raise ConnectionNotAvailable()
-
- with self._init_lock:
- if not self._sent_connection_init:
- try:
- kwargs = {"request": request}
- with Trace("send_connection_init", logger, request, kwargs):
- self._send_connection_init(**kwargs)
- except BaseException as exc:
- with ShieldCancellation():
- self.close()
- raise exc
-
- self._sent_connection_init = True
-
- # Initially start with just 1 until the remote server provides
- # its max_concurrent_streams value
- self._max_streams = 1
-
- local_settings_max_streams = (
- self._h2_state.local_settings.max_concurrent_streams
- )
- self._max_streams_semaphore = Semaphore(local_settings_max_streams)
-
- for _ in range(local_settings_max_streams - self._max_streams):
- self._max_streams_semaphore.acquire()
-
- self._max_streams_semaphore.acquire()
-
- try:
- stream_id = self._h2_state.get_next_available_stream_id()
- self._events[stream_id] = []
- except h2.exceptions.NoAvailableStreamIDError: # pragma: nocover
- self._used_all_stream_ids = True
- self._request_count -= 1
- raise ConnectionNotAvailable()
-
- try:
- kwargs = {"request": request, "stream_id": stream_id}
- with Trace("send_request_headers", logger, request, kwargs):
- self._send_request_headers(request=request, stream_id=stream_id)
- with Trace("send_request_body", logger, request, kwargs):
- self._send_request_body(request=request, stream_id=stream_id)
- with Trace(
- "receive_response_headers", logger, request, kwargs
- ) as trace:
- status, headers = self._receive_response(
- request=request, stream_id=stream_id
- )
- trace.return_value = (status, headers)
-
- return Response(
- status=status,
- headers=headers,
- content=HTTP2ConnectionByteStream(self, request, stream_id=stream_id),
- extensions={
- "http_version": b"HTTP/2",
- "network_stream": self._network_stream,
- "stream_id": stream_id,
- },
- )
- except BaseException as exc: # noqa: PIE786
- with ShieldCancellation():
- kwargs = {"stream_id": stream_id}
- with Trace("response_closed", logger, request, kwargs):
- self._response_closed(stream_id=stream_id)
-
- if isinstance(exc, h2.exceptions.ProtocolError):
- # One case where h2 can raise a protocol error is when a
- # closed frame has been seen by the state machine.
- #
- # This happens when one stream is reading, and encounters
- # a GOAWAY event. Other flows of control may then raise
- # a protocol error at any point they interact with the 'h2_state'.
- #
- # In this case we'll have stored the event, and should raise
- # it as a RemoteProtocolError.
- if self._connection_terminated: # pragma: nocover
- raise RemoteProtocolError(self._connection_terminated)
- # If h2 raises a protocol error in some other state then we
- # must somehow have made a protocol violation.
- raise LocalProtocolError(exc) # pragma: nocover
-
- raise exc
-
- def _send_connection_init(self, request: Request) -> None:
- """
- The HTTP/2 connection requires some initial setup before we can start
- using individual request/response streams on it.
- """
- # Need to set these manually here instead of manipulating via
- # __setitem__() otherwise the H2Connection will emit SettingsUpdate
- # frames in addition to sending the undesired defaults.
- self._h2_state.local_settings = h2.settings.Settings(
- client=True,
- initial_values={
- # Disable PUSH_PROMISE frames from the server since we don't do anything
- # with them for now. Maybe when we support caching?
- h2.settings.SettingCodes.ENABLE_PUSH: 0,
- # These two are taken from h2 for safe defaults
- h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS: 100,
- h2.settings.SettingCodes.MAX_HEADER_LIST_SIZE: 65536,
- },
- )
-
- # Some websites (*cough* Yahoo *cough*) balk at this setting being
- # present in the initial handshake since it's not defined in the original
- # RFC despite the RFC mandating ignoring settings you don't know about.
- del self._h2_state.local_settings[
- h2.settings.SettingCodes.ENABLE_CONNECT_PROTOCOL
- ]
-
- self._h2_state.initiate_connection()
- self._h2_state.increment_flow_control_window(2**24)
- self._write_outgoing_data(request)
-
- # Sending the request...
-
- def _send_request_headers(self, request: Request, stream_id: int) -> None:
- """
- Send the request headers to a given stream ID.
- """
- end_stream = not has_body_headers(request)
-
- # In HTTP/2 the ':authority' pseudo-header is used instead of 'Host'.
- # In order to gracefully handle HTTP/1.1 and HTTP/2 we always require
- # HTTP/1.1 style headers, and map them appropriately if we end up on
- # an HTTP/2 connection.
- authority = [v for k, v in request.headers if k.lower() == b"host"][0]
-
- headers = [
- (b":method", request.method),
- (b":authority", authority),
- (b":scheme", request.url.scheme),
- (b":path", request.url.target),
- ] + [
- (k.lower(), v)
- for k, v in request.headers
- if k.lower()
- not in (
- b"host",
- b"transfer-encoding",
- )
- ]
-
- self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)
- self._h2_state.increment_flow_control_window(2**24, stream_id=stream_id)
- self._write_outgoing_data(request)
-
- def _send_request_body(self, request: Request, stream_id: int) -> None:
- """
- Iterate over the request body sending it to a given stream ID.
- """
- if not has_body_headers(request):
- return
-
- assert isinstance(request.stream, typing.Iterable)
- for data in request.stream:
- self._send_stream_data(request, stream_id, data)
- self._send_end_stream(request, stream_id)
-
- def _send_stream_data(
- self, request: Request, stream_id: int, data: bytes
- ) -> None:
- """
- Send a single chunk of data in one or more data frames.
- """
- while data:
- max_flow = self._wait_for_outgoing_flow(request, stream_id)
- chunk_size = min(len(data), max_flow)
- chunk, data = data[:chunk_size], data[chunk_size:]
- self._h2_state.send_data(stream_id, chunk)
- self._write_outgoing_data(request)
-
- def _send_end_stream(self, request: Request, stream_id: int) -> None:
- """
- Send an empty data frame on on a given stream ID with the END_STREAM flag set.
- """
- self._h2_state.end_stream(stream_id)
- self._write_outgoing_data(request)
-
- # Receiving the response...
-
- def _receive_response(
- self, request: Request, stream_id: int
- ) -> tuple[int, list[tuple[bytes, bytes]]]:
- """
- Return the response status code and headers for a given stream ID.
- """
- while True:
- event = self._receive_stream_event(request, stream_id)
- if isinstance(event, h2.events.ResponseReceived):
- break
-
- status_code = 200
- headers = []
- for k, v in event.headers:
- if k == b":status":
- status_code = int(v.decode("ascii", errors="ignore"))
- elif not k.startswith(b":"):
- headers.append((k, v))
-
- return (status_code, headers)
-
- def _receive_response_body(
- self, request: Request, stream_id: int
- ) -> typing.Iterator[bytes]:
- """
- Iterator that returns the bytes of the response body for a given stream ID.
- """
- while True:
- event = self._receive_stream_event(request, stream_id)
- if isinstance(event, h2.events.DataReceived):
- amount = event.flow_controlled_length
- self._h2_state.acknowledge_received_data(amount, stream_id)
- self._write_outgoing_data(request)
- yield event.data
- elif isinstance(event, h2.events.StreamEnded):
- break
-
- def _receive_stream_event(
- self, request: Request, stream_id: int
- ) -> h2.events.ResponseReceived | h2.events.DataReceived | h2.events.StreamEnded:
- """
- Return the next available event for a given stream ID.
-
- Will read more data from the network if required.
- """
- while not self._events.get(stream_id):
- self._receive_events(request, stream_id)
- event = self._events[stream_id].pop(0)
- if isinstance(event, h2.events.StreamReset):
- raise RemoteProtocolError(event)
- return event
-
- def _receive_events(
- self, request: Request, stream_id: int | None = None
- ) -> None:
- """
- Read some data from the network until we see one or more events
- for a given stream ID.
- """
- with self._read_lock:
- if self._connection_terminated is not None:
- last_stream_id = self._connection_terminated.last_stream_id
- if stream_id and last_stream_id and stream_id > last_stream_id:
- self._request_count -= 1
- raise ConnectionNotAvailable()
- raise RemoteProtocolError(self._connection_terminated)
-
- # This conditional is a bit icky. We don't want to block reading if we've
- # actually got an event to return for a given stream. We need to do that
- # check *within* the atomic read lock. Though it also need to be optional,
- # because when we call it from `_wait_for_outgoing_flow` we *do* want to
- # block until we've available flow control, event when we have events
- # pending for the stream ID we're attempting to send on.
- if stream_id is None or not self._events.get(stream_id):
- events = self._read_incoming_data(request)
- for event in events:
- if isinstance(event, h2.events.RemoteSettingsChanged):
- with Trace(
- "receive_remote_settings", logger, request
- ) as trace:
- self._receive_remote_settings_change(event)
- trace.return_value = event
-
- elif isinstance(
- event,
- (
- h2.events.ResponseReceived,
- h2.events.DataReceived,
- h2.events.StreamEnded,
- h2.events.StreamReset,
- ),
- ):
- if event.stream_id in self._events:
- self._events[event.stream_id].append(event)
-
- elif isinstance(event, h2.events.ConnectionTerminated):
- self._connection_terminated = event
-
- self._write_outgoing_data(request)
-
- def _receive_remote_settings_change(self, event: h2.events.Event) -> None:
- max_concurrent_streams = event.changed_settings.get(
- h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS
- )
- if max_concurrent_streams:
- new_max_streams = min(
- max_concurrent_streams.new_value,
- self._h2_state.local_settings.max_concurrent_streams,
- )
- if new_max_streams and new_max_streams != self._max_streams:
- while new_max_streams > self._max_streams:
- self._max_streams_semaphore.release()
- self._max_streams += 1
- while new_max_streams < self._max_streams:
- self._max_streams_semaphore.acquire()
- self._max_streams -= 1
-
- def _response_closed(self, stream_id: int) -> None:
- self._max_streams_semaphore.release()
- del self._events[stream_id]
- with self._state_lock:
- if self._connection_terminated and not self._events:
- self.close()
-
- elif self._state == HTTPConnectionState.ACTIVE and not self._events:
- self._state = HTTPConnectionState.IDLE
- if self._keepalive_expiry is not None:
- now = time.monotonic()
- self._expire_at = now + self._keepalive_expiry
- if self._used_all_stream_ids: # pragma: nocover
- self.close()
-
- def close(self) -> None:
- # Note that this method unilaterally closes the connection, and does
- # not have any kind of locking in place around it.
- self._h2_state.close_connection()
- self._state = HTTPConnectionState.CLOSED
- self._network_stream.close()
-
- # Wrappers around network read/write operations...
-
- def _read_incoming_data(self, request: Request) -> list[h2.events.Event]:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("read", None)
-
- if self._read_exception is not None:
- raise self._read_exception # pragma: nocover
-
- try:
- data = self._network_stream.read(self.READ_NUM_BYTES, timeout)
- if data == b"":
- raise RemoteProtocolError("Server disconnected")
- except Exception as exc:
- # If we get a network error we should:
- #
- # 1. Save the exception and just raise it immediately on any future reads.
- # (For example, this means that a single read timeout or disconnect will
- # immediately close all pending streams. Without requiring multiple
- # sequential timeouts.)
- # 2. Mark the connection as errored, so that we don't accept any other
- # incoming requests.
- self._read_exception = exc
- self._connection_error = True
- raise exc
-
- events: list[h2.events.Event] = self._h2_state.receive_data(data)
-
- return events
-
- def _write_outgoing_data(self, request: Request) -> None:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("write", None)
-
- with self._write_lock:
- data_to_send = self._h2_state.data_to_send()
-
- if self._write_exception is not None:
- raise self._write_exception # pragma: nocover
-
- try:
- self._network_stream.write(data_to_send, timeout)
- except Exception as exc: # pragma: nocover
- # If we get a network error we should:
- #
- # 1. Save the exception and just raise it immediately on any future write.
- # (For example, this means that a single write timeout or disconnect will
- # immediately close all pending streams. Without requiring multiple
- # sequential timeouts.)
- # 2. Mark the connection as errored, so that we don't accept any other
- # incoming requests.
- self._write_exception = exc
- self._connection_error = True
- raise exc
-
- # Flow control...
-
- def _wait_for_outgoing_flow(self, request: Request, stream_id: int) -> int:
- """
- Returns the maximum allowable outgoing flow for a given stream.
-
- If the allowable flow is zero, then waits on the network until
- WindowUpdated frames have increased the flow rate.
- https://tools.ietf.org/html/rfc7540#section-6.9
- """
- local_flow: int = self._h2_state.local_flow_control_window(stream_id)
- max_frame_size: int = self._h2_state.max_outbound_frame_size
- flow = min(local_flow, max_frame_size)
- while flow == 0:
- self._receive_events(request)
- local_flow = self._h2_state.local_flow_control_window(stream_id)
- max_frame_size = self._h2_state.max_outbound_frame_size
- flow = min(local_flow, max_frame_size)
- return flow
-
- # Interface for connection pooling...
-
- def can_handle_request(self, origin: Origin) -> bool:
- return origin == self._origin
-
- def is_available(self) -> bool:
- return (
- self._state != HTTPConnectionState.CLOSED
- and not self._connection_error
- and not self._used_all_stream_ids
- and not (
- self._h2_state.state_machine.state
- == h2.connection.ConnectionState.CLOSED
- )
- )
-
- def has_expired(self) -> bool:
- now = time.monotonic()
- return self._expire_at is not None and now > self._expire_at
-
- def is_idle(self) -> bool:
- return self._state == HTTPConnectionState.IDLE
-
- def is_closed(self) -> bool:
- return self._state == HTTPConnectionState.CLOSED
-
- def info(self) -> str:
- origin = str(self._origin)
- return (
- f"{origin!r}, HTTP/2, {self._state.name}, "
- f"Request Count: {self._request_count}"
- )
-
- def __repr__(self) -> str:
- class_name = self.__class__.__name__
- origin = str(self._origin)
- return (
- f"<{class_name} [{origin!r}, {self._state.name}, "
- f"Request Count: {self._request_count}]>"
- )
-
- # These context managers are not used in the standard flow, but are
- # useful for testing or working with connection instances directly.
-
- def __enter__(self) -> HTTP2Connection:
- return self
-
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- self.close()
-
-
-class HTTP2ConnectionByteStream:
- def __init__(
- self, connection: HTTP2Connection, request: Request, stream_id: int
- ) -> None:
- self._connection = connection
- self._request = request
- self._stream_id = stream_id
- self._closed = False
-
- def __iter__(self) -> typing.Iterator[bytes]:
- kwargs = {"request": self._request, "stream_id": self._stream_id}
- try:
- with Trace("receive_response_body", logger, self._request, kwargs):
- for chunk in self._connection._receive_response_body(
- request=self._request, stream_id=self._stream_id
- ):
- yield chunk
- except BaseException as exc:
- # If we get an exception while streaming the response,
- # we want to close the response (and possibly the connection)
- # before raising that exception.
- with ShieldCancellation():
- self.close()
- raise exc
-
- def close(self) -> None:
- if not self._closed:
- self._closed = True
- kwargs = {"stream_id": self._stream_id}
- with Trace("response_closed", logger, self._request, kwargs):
- self._connection._response_closed(stream_id=self._stream_id)
diff --git a/contrib/python/httpcore/httpcore/_sync/http_proxy.py b/contrib/python/httpcore/httpcore/_sync/http_proxy.py
deleted file mode 100644
index ecca88f7dc9..00000000000
--- a/contrib/python/httpcore/httpcore/_sync/http_proxy.py
+++ /dev/null
@@ -1,367 +0,0 @@
-from __future__ import annotations
-
-import base64
-import logging
-import ssl
-import typing
-
-from .._backends.base import SOCKET_OPTION, NetworkBackend
-from .._exceptions import ProxyError
-from .._models import (
- URL,
- Origin,
- Request,
- Response,
- enforce_bytes,
- enforce_headers,
- enforce_url,
-)
-from .._ssl import default_ssl_context
-from .._synchronization import Lock
-from .._trace import Trace
-from .connection import HTTPConnection
-from .connection_pool import ConnectionPool
-from .http11 import HTTP11Connection
-from .interfaces import ConnectionInterface
-
-ByteOrStr = typing.Union[bytes, str]
-HeadersAsSequence = typing.Sequence[typing.Tuple[ByteOrStr, ByteOrStr]]
-HeadersAsMapping = typing.Mapping[ByteOrStr, ByteOrStr]
-
-
-logger = logging.getLogger("httpcore.proxy")
-
-
-def merge_headers(
- default_headers: typing.Sequence[tuple[bytes, bytes]] | None = None,
- override_headers: typing.Sequence[tuple[bytes, bytes]] | None = None,
-) -> list[tuple[bytes, bytes]]:
- """
- Append default_headers and override_headers, de-duplicating if a key exists
- in both cases.
- """
- default_headers = [] if default_headers is None else list(default_headers)
- override_headers = [] if override_headers is None else list(override_headers)
- has_override = set(key.lower() for key, value in override_headers)
- default_headers = [
- (key, value)
- for key, value in default_headers
- if key.lower() not in has_override
- ]
- return default_headers + override_headers
-
-
-class HTTPProxy(ConnectionPool): # pragma: nocover
- """
- A connection pool that sends requests via an HTTP proxy.
- """
-
- def __init__(
- self,
- proxy_url: URL | bytes | str,
- proxy_auth: tuple[bytes | str, bytes | str] | None = None,
- proxy_headers: HeadersAsMapping | HeadersAsSequence | None = None,
- ssl_context: ssl.SSLContext | None = None,
- proxy_ssl_context: ssl.SSLContext | None = None,
- max_connections: int | None = 10,
- max_keepalive_connections: int | None = None,
- keepalive_expiry: float | None = None,
- http1: bool = True,
- http2: bool = False,
- retries: int = 0,
- local_address: str | None = None,
- uds: str | None = None,
- network_backend: NetworkBackend | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> None:
- """
- A connection pool for making HTTP requests.
-
- Parameters:
- proxy_url: The URL to use when connecting to the proxy server.
- For example `"http://127.0.0.1:8080/"`.
- proxy_auth: Any proxy authentication as a two-tuple of
- (username, password). May be either bytes or ascii-only str.
- proxy_headers: Any HTTP headers to use for the proxy requests.
- For example `{"Proxy-Authorization": "Basic <username>:<password>"}`.
- ssl_context: An SSL context to use for verifying connections.
- If not specified, the default `httpcore.default_ssl_context()`
- will be used.
- proxy_ssl_context: The same as `ssl_context`, but for a proxy server rather than a remote origin.
- max_connections: The maximum number of concurrent HTTP connections that
- the pool should allow. Any attempt to send a request on a pool that
- would exceed this amount will block until a connection is available.
- max_keepalive_connections: The maximum number of idle HTTP connections
- that will be maintained in the pool.
- keepalive_expiry: The duration in seconds that an idle HTTP connection
- may be maintained for before being expired from the pool.
- http1: A boolean indicating if HTTP/1.1 requests should be supported
- by the connection pool. Defaults to True.
- http2: A boolean indicating if HTTP/2 requests should be supported by
- the connection pool. Defaults to False.
- retries: The maximum number of retries when trying to establish
- a connection.
- local_address: Local address to connect from. Can also be used to
- connect using a particular address family. Using
- `local_address="0.0.0.0"` will connect using an `AF_INET` address
- (IPv4), while using `local_address="::"` will connect using an
- `AF_INET6` address (IPv6).
- uds: Path to a Unix Domain Socket to use instead of TCP sockets.
- network_backend: A backend instance to use for handling network I/O.
- """
- super().__init__(
- ssl_context=ssl_context,
- max_connections=max_connections,
- max_keepalive_connections=max_keepalive_connections,
- keepalive_expiry=keepalive_expiry,
- http1=http1,
- http2=http2,
- network_backend=network_backend,
- retries=retries,
- local_address=local_address,
- uds=uds,
- socket_options=socket_options,
- )
-
- self._proxy_url = enforce_url(proxy_url, name="proxy_url")
- if (
- self._proxy_url.scheme == b"http" and proxy_ssl_context is not None
- ): # pragma: no cover
- raise RuntimeError(
- "The `proxy_ssl_context` argument is not allowed for the http scheme"
- )
-
- self._ssl_context = ssl_context
- self._proxy_ssl_context = proxy_ssl_context
- self._proxy_headers = enforce_headers(proxy_headers, name="proxy_headers")
- if proxy_auth is not None:
- username = enforce_bytes(proxy_auth[0], name="proxy_auth")
- password = enforce_bytes(proxy_auth[1], name="proxy_auth")
- userpass = username + b":" + password
- authorization = b"Basic " + base64.b64encode(userpass)
- self._proxy_headers = [
- (b"Proxy-Authorization", authorization)
- ] + self._proxy_headers
-
- def create_connection(self, origin: Origin) -> ConnectionInterface:
- if origin.scheme == b"http":
- return ForwardHTTPConnection(
- proxy_origin=self._proxy_url.origin,
- proxy_headers=self._proxy_headers,
- remote_origin=origin,
- keepalive_expiry=self._keepalive_expiry,
- network_backend=self._network_backend,
- proxy_ssl_context=self._proxy_ssl_context,
- )
- return TunnelHTTPConnection(
- proxy_origin=self._proxy_url.origin,
- proxy_headers=self._proxy_headers,
- remote_origin=origin,
- ssl_context=self._ssl_context,
- proxy_ssl_context=self._proxy_ssl_context,
- keepalive_expiry=self._keepalive_expiry,
- http1=self._http1,
- http2=self._http2,
- network_backend=self._network_backend,
- )
-
-
-class ForwardHTTPConnection(ConnectionInterface):
- def __init__(
- self,
- proxy_origin: Origin,
- remote_origin: Origin,
- proxy_headers: HeadersAsMapping | HeadersAsSequence | None = None,
- keepalive_expiry: float | None = None,
- network_backend: NetworkBackend | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- proxy_ssl_context: ssl.SSLContext | None = None,
- ) -> None:
- self._connection = HTTPConnection(
- origin=proxy_origin,
- keepalive_expiry=keepalive_expiry,
- network_backend=network_backend,
- socket_options=socket_options,
- ssl_context=proxy_ssl_context,
- )
- self._proxy_origin = proxy_origin
- self._proxy_headers = enforce_headers(proxy_headers, name="proxy_headers")
- self._remote_origin = remote_origin
-
- def handle_request(self, request: Request) -> Response:
- headers = merge_headers(self._proxy_headers, request.headers)
- url = URL(
- scheme=self._proxy_origin.scheme,
- host=self._proxy_origin.host,
- port=self._proxy_origin.port,
- target=bytes(request.url),
- )
- proxy_request = Request(
- method=request.method,
- url=url,
- headers=headers,
- content=request.stream,
- extensions=request.extensions,
- )
- return self._connection.handle_request(proxy_request)
-
- def can_handle_request(self, origin: Origin) -> bool:
- return origin == self._remote_origin
-
- def close(self) -> None:
- self._connection.close()
-
- def info(self) -> str:
- return self._connection.info()
-
- def is_available(self) -> bool:
- return self._connection.is_available()
-
- def has_expired(self) -> bool:
- return self._connection.has_expired()
-
- def is_idle(self) -> bool:
- return self._connection.is_idle()
-
- def is_closed(self) -> bool:
- return self._connection.is_closed()
-
- def __repr__(self) -> str:
- return f"<{self.__class__.__name__} [{self.info()}]>"
-
-
-class TunnelHTTPConnection(ConnectionInterface):
- def __init__(
- self,
- proxy_origin: Origin,
- remote_origin: Origin,
- ssl_context: ssl.SSLContext | None = None,
- proxy_ssl_context: ssl.SSLContext | None = None,
- proxy_headers: typing.Sequence[tuple[bytes, bytes]] | None = None,
- keepalive_expiry: float | None = None,
- http1: bool = True,
- http2: bool = False,
- network_backend: NetworkBackend | None = None,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> None:
- self._connection: ConnectionInterface = HTTPConnection(
- origin=proxy_origin,
- keepalive_expiry=keepalive_expiry,
- network_backend=network_backend,
- socket_options=socket_options,
- ssl_context=proxy_ssl_context,
- )
- self._proxy_origin = proxy_origin
- self._remote_origin = remote_origin
- self._ssl_context = ssl_context
- self._proxy_ssl_context = proxy_ssl_context
- self._proxy_headers = enforce_headers(proxy_headers, name="proxy_headers")
- self._keepalive_expiry = keepalive_expiry
- self._http1 = http1
- self._http2 = http2
- self._connect_lock = Lock()
- self._connected = False
-
- def handle_request(self, request: Request) -> Response:
- timeouts = request.extensions.get("timeout", {})
- timeout = timeouts.get("connect", None)
-
- with self._connect_lock:
- if not self._connected:
- target = b"%b:%d" % (self._remote_origin.host, self._remote_origin.port)
-
- connect_url = URL(
- scheme=self._proxy_origin.scheme,
- host=self._proxy_origin.host,
- port=self._proxy_origin.port,
- target=target,
- )
- connect_headers = merge_headers(
- [(b"Host", target), (b"Accept", b"*/*")], self._proxy_headers
- )
- connect_request = Request(
- method=b"CONNECT",
- url=connect_url,
- headers=connect_headers,
- extensions=request.extensions,
- )
- connect_response = self._connection.handle_request(
- connect_request
- )
-
- if connect_response.status < 200 or connect_response.status > 299:
- reason_bytes = connect_response.extensions.get("reason_phrase", b"")
- reason_str = reason_bytes.decode("ascii", errors="ignore")
- msg = "%d %s" % (connect_response.status, reason_str)
- self._connection.close()
- raise ProxyError(msg)
-
- stream = connect_response.extensions["network_stream"]
-
- # Upgrade the stream to SSL
- ssl_context = (
- default_ssl_context()
- if self._ssl_context is None
- else self._ssl_context
- )
- alpn_protocols = ["http/1.1", "h2"] if self._http2 else ["http/1.1"]
- ssl_context.set_alpn_protocols(alpn_protocols)
-
- kwargs = {
- "ssl_context": ssl_context,
- "server_hostname": self._remote_origin.host.decode("ascii"),
- "timeout": timeout,
- }
- with Trace("start_tls", logger, request, kwargs) as trace:
- stream = stream.start_tls(**kwargs)
- trace.return_value = stream
-
- # Determine if we should be using HTTP/1.1 or HTTP/2
- ssl_object = stream.get_extra_info("ssl_object")
- http2_negotiated = (
- ssl_object is not None
- and ssl_object.selected_alpn_protocol() == "h2"
- )
-
- # Create the HTTP/1.1 or HTTP/2 connection
- if http2_negotiated or (self._http2 and not self._http1):
- from .http2 import HTTP2Connection
-
- self._connection = HTTP2Connection(
- origin=self._remote_origin,
- stream=stream,
- keepalive_expiry=self._keepalive_expiry,
- )
- else:
- self._connection = HTTP11Connection(
- origin=self._remote_origin,
- stream=stream,
- keepalive_expiry=self._keepalive_expiry,
- )
-
- self._connected = True
- return self._connection.handle_request(request)
-
- def can_handle_request(self, origin: Origin) -> bool:
- return origin == self._remote_origin
-
- def close(self) -> None:
- self._connection.close()
-
- def info(self) -> str:
- return self._connection.info()
-
- def is_available(self) -> bool:
- return self._connection.is_available()
-
- def has_expired(self) -> bool:
- return self._connection.has_expired()
-
- def is_idle(self) -> bool:
- return self._connection.is_idle()
-
- def is_closed(self) -> bool:
- return self._connection.is_closed()
-
- def __repr__(self) -> str:
- return f"<{self.__class__.__name__} [{self.info()}]>"
diff --git a/contrib/python/httpcore/httpcore/_sync/interfaces.py b/contrib/python/httpcore/httpcore/_sync/interfaces.py
deleted file mode 100644
index e673d4cc1b1..00000000000
--- a/contrib/python/httpcore/httpcore/_sync/interfaces.py
+++ /dev/null
@@ -1,137 +0,0 @@
-from __future__ import annotations
-
-import contextlib
-import typing
-
-from .._models import (
- URL,
- Extensions,
- HeaderTypes,
- Origin,
- Request,
- Response,
- enforce_bytes,
- enforce_headers,
- enforce_url,
- include_request_headers,
-)
-
-
-class RequestInterface:
- def request(
- self,
- method: bytes | str,
- url: URL | bytes | str,
- *,
- headers: HeaderTypes = None,
- content: bytes | typing.Iterator[bytes] | None = None,
- extensions: Extensions | None = None,
- ) -> Response:
- # Strict type checking on our parameters.
- method = enforce_bytes(method, name="method")
- url = enforce_url(url, name="url")
- headers = enforce_headers(headers, name="headers")
-
- # Include Host header, and optionally Content-Length or Transfer-Encoding.
- headers = include_request_headers(headers, url=url, content=content)
-
- request = Request(
- method=method,
- url=url,
- headers=headers,
- content=content,
- extensions=extensions,
- )
- response = self.handle_request(request)
- try:
- response.read()
- finally:
- response.close()
- return response
-
- @contextlib.contextmanager
- def stream(
- self,
- method: bytes | str,
- url: URL | bytes | str,
- *,
- headers: HeaderTypes = None,
- content: bytes | typing.Iterator[bytes] | None = None,
- extensions: Extensions | None = None,
- ) -> typing.Iterator[Response]:
- # Strict type checking on our parameters.
- method = enforce_bytes(method, name="method")
- url = enforce_url(url, name="url")
- headers = enforce_headers(headers, name="headers")
-
- # Include Host header, and optionally Content-Length or Transfer-Encoding.
- headers = include_request_headers(headers, url=url, content=content)
-
- request = Request(
- method=method,
- url=url,
- headers=headers,
- content=content,
- extensions=extensions,
- )
- response = self.handle_request(request)
- try:
- yield response
- finally:
- response.close()
-
- def handle_request(self, request: Request) -> Response:
- raise NotImplementedError() # pragma: nocover
-
-
-class ConnectionInterface(RequestInterface):
- def close(self) -> None:
- raise NotImplementedError() # pragma: nocover
-
- def info(self) -> str:
- raise NotImplementedError() # pragma: nocover
-
- def can_handle_request(self, origin: Origin) -> bool:
- raise NotImplementedError() # pragma: nocover
-
- def is_available(self) -> bool:
- """
- Return `True` if the connection is currently able to accept an
- outgoing request.
-
- An HTTP/1.1 connection will only be available if it is currently idle.
-
- An HTTP/2 connection will be available so long as the stream ID space is
- not yet exhausted, and the connection is not in an error state.
-
- While the connection is being established we may not yet know if it is going
- to result in an HTTP/1.1 or HTTP/2 connection. The connection should be
- treated as being available, but might ultimately raise `NewConnectionRequired`
- required exceptions if multiple requests are attempted over a connection
- that ends up being established as HTTP/1.1.
- """
- raise NotImplementedError() # pragma: nocover
-
- def has_expired(self) -> bool:
- """
- Return `True` if the connection is in a state where it should be closed.
-
- This either means that the connection is idle and it has passed the
- expiry time on its keep-alive, or that server has sent an EOF.
- """
- raise NotImplementedError() # pragma: nocover
-
- def is_idle(self) -> bool:
- """
- Return `True` if the connection is currently idle.
- """
- raise NotImplementedError() # pragma: nocover
-
- def is_closed(self) -> bool:
- """
- Return `True` if the connection has been closed.
-
- Used when a response is closed to determine if the connection may be
- returned to the connection pool or not.
- """
- raise NotImplementedError() # pragma: nocover
diff --git a/contrib/python/httpcore/httpcore/_sync/socks_proxy.py b/contrib/python/httpcore/httpcore/_sync/socks_proxy.py
deleted file mode 100644
index 0ca96ddfb58..00000000000
--- a/contrib/python/httpcore/httpcore/_sync/socks_proxy.py
+++ /dev/null
@@ -1,341 +0,0 @@
-from __future__ import annotations
-
-import logging
-import ssl
-
-import socksio
-
-from .._backends.sync import SyncBackend
-from .._backends.base import NetworkBackend, NetworkStream
-from .._exceptions import ConnectionNotAvailable, ProxyError
-from .._models import URL, Origin, Request, Response, enforce_bytes, enforce_url
-from .._ssl import default_ssl_context
-from .._synchronization import Lock
-from .._trace import Trace
-from .connection_pool import ConnectionPool
-from .http11 import HTTP11Connection
-from .interfaces import ConnectionInterface
-
-logger = logging.getLogger("httpcore.socks")
-
-
-AUTH_METHODS = {
- b"\x00": "NO AUTHENTICATION REQUIRED",
- b"\x01": "GSSAPI",
- b"\x02": "USERNAME/PASSWORD",
- b"\xff": "NO ACCEPTABLE METHODS",
-}
-
-REPLY_CODES = {
- b"\x00": "Succeeded",
- b"\x01": "General SOCKS server failure",
- b"\x02": "Connection not allowed by ruleset",
- b"\x03": "Network unreachable",
- b"\x04": "Host unreachable",
- b"\x05": "Connection refused",
- b"\x06": "TTL expired",
- b"\x07": "Command not supported",
- b"\x08": "Address type not supported",
-}
-
-
-def _init_socks5_connection(
- stream: NetworkStream,
- *,
- host: bytes,
- port: int,
- auth: tuple[bytes, bytes] | None = None,
-) -> None:
- conn = socksio.socks5.SOCKS5Connection()
-
- # Auth method request
- auth_method = (
- socksio.socks5.SOCKS5AuthMethod.NO_AUTH_REQUIRED
- if auth is None
- else socksio.socks5.SOCKS5AuthMethod.USERNAME_PASSWORD
- )
- conn.send(socksio.socks5.SOCKS5AuthMethodsRequest([auth_method]))
- outgoing_bytes = conn.data_to_send()
- stream.write(outgoing_bytes)
-
- # Auth method response
- incoming_bytes = stream.read(max_bytes=4096)
- response = conn.receive_data(incoming_bytes)
- assert isinstance(response, socksio.socks5.SOCKS5AuthReply)
- if response.method != auth_method:
- requested = AUTH_METHODS.get(auth_method, "UNKNOWN")
- responded = AUTH_METHODS.get(response.method, "UNKNOWN")
- raise ProxyError(
- f"Requested {requested} from proxy server, but got {responded}."
- )
-
- if response.method == socksio.socks5.SOCKS5AuthMethod.USERNAME_PASSWORD:
- # Username/password request
- assert auth is not None
- username, password = auth
- conn.send(socksio.socks5.SOCKS5UsernamePasswordRequest(username, password))
- outgoing_bytes = conn.data_to_send()
- stream.write(outgoing_bytes)
-
- # Username/password response
- incoming_bytes = stream.read(max_bytes=4096)
- response = conn.receive_data(incoming_bytes)
- assert isinstance(response, socksio.socks5.SOCKS5UsernamePasswordReply)
- if not response.success:
- raise ProxyError("Invalid username/password")
-
- # Connect request
- conn.send(
- socksio.socks5.SOCKS5CommandRequest.from_address(
- socksio.socks5.SOCKS5Command.CONNECT, (host, port)
- )
- )
- outgoing_bytes = conn.data_to_send()
- stream.write(outgoing_bytes)
-
- # Connect response
- incoming_bytes = stream.read(max_bytes=4096)
- response = conn.receive_data(incoming_bytes)
- assert isinstance(response, socksio.socks5.SOCKS5Reply)
- if response.reply_code != socksio.socks5.SOCKS5ReplyCode.SUCCEEDED:
- reply_code = REPLY_CODES.get(response.reply_code, "UNKOWN")
- raise ProxyError(f"Proxy Server could not connect: {reply_code}.")
-
-
-class SOCKSProxy(ConnectionPool): # pragma: nocover
- """
- A connection pool that sends requests via an HTTP proxy.
- """
-
- def __init__(
- self,
- proxy_url: URL | bytes | str,
- proxy_auth: tuple[bytes | str, bytes | str] | None = None,
- ssl_context: ssl.SSLContext | None = None,
- max_connections: int | None = 10,
- max_keepalive_connections: int | None = None,
- keepalive_expiry: float | None = None,
- http1: bool = True,
- http2: bool = False,
- retries: int = 0,
- network_backend: NetworkBackend | None = None,
- ) -> None:
- """
- A connection pool for making HTTP requests.
-
- Parameters:
- proxy_url: The URL to use when connecting to the proxy server.
- For example `"http://127.0.0.1:8080/"`.
- ssl_context: An SSL context to use for verifying connections.
- If not specified, the default `httpcore.default_ssl_context()`
- will be used.
- max_connections: The maximum number of concurrent HTTP connections that
- the pool should allow. Any attempt to send a request on a pool that
- would exceed this amount will block until a connection is available.
- max_keepalive_connections: The maximum number of idle HTTP connections
- that will be maintained in the pool.
- keepalive_expiry: The duration in seconds that an idle HTTP connection
- may be maintained for before being expired from the pool.
- http1: A boolean indicating if HTTP/1.1 requests should be supported
- by the connection pool. Defaults to True.
- http2: A boolean indicating if HTTP/2 requests should be supported by
- the connection pool. Defaults to False.
- retries: The maximum number of retries when trying to establish
- a connection.
- local_address: Local address to connect from. Can also be used to
- connect using a particular address family. Using
- `local_address="0.0.0.0"` will connect using an `AF_INET` address
- (IPv4), while using `local_address="::"` will connect using an
- `AF_INET6` address (IPv6).
- uds: Path to a Unix Domain Socket to use instead of TCP sockets.
- network_backend: A backend instance to use for handling network I/O.
- """
- super().__init__(
- ssl_context=ssl_context,
- max_connections=max_connections,
- max_keepalive_connections=max_keepalive_connections,
- keepalive_expiry=keepalive_expiry,
- http1=http1,
- http2=http2,
- network_backend=network_backend,
- retries=retries,
- )
- self._ssl_context = ssl_context
- self._proxy_url = enforce_url(proxy_url, name="proxy_url")
- if proxy_auth is not None:
- username, password = proxy_auth
- username_bytes = enforce_bytes(username, name="proxy_auth")
- password_bytes = enforce_bytes(password, name="proxy_auth")
- self._proxy_auth: tuple[bytes, bytes] | None = (
- username_bytes,
- password_bytes,
- )
- else:
- self._proxy_auth = None
-
- def create_connection(self, origin: Origin) -> ConnectionInterface:
- return Socks5Connection(
- proxy_origin=self._proxy_url.origin,
- remote_origin=origin,
- proxy_auth=self._proxy_auth,
- ssl_context=self._ssl_context,
- keepalive_expiry=self._keepalive_expiry,
- http1=self._http1,
- http2=self._http2,
- network_backend=self._network_backend,
- )
-
-
-class Socks5Connection(ConnectionInterface):
- def __init__(
- self,
- proxy_origin: Origin,
- remote_origin: Origin,
- proxy_auth: tuple[bytes, bytes] | None = None,
- ssl_context: ssl.SSLContext | None = None,
- keepalive_expiry: float | None = None,
- http1: bool = True,
- http2: bool = False,
- network_backend: NetworkBackend | None = None,
- ) -> None:
- self._proxy_origin = proxy_origin
- self._remote_origin = remote_origin
- self._proxy_auth = proxy_auth
- self._ssl_context = ssl_context
- self._keepalive_expiry = keepalive_expiry
- self._http1 = http1
- self._http2 = http2
-
- self._network_backend: NetworkBackend = (
- SyncBackend() if network_backend is None else network_backend
- )
- self._connect_lock = Lock()
- self._connection: ConnectionInterface | None = None
- self._connect_failed = False
-
- def handle_request(self, request: Request) -> Response:
- timeouts = request.extensions.get("timeout", {})
- sni_hostname = request.extensions.get("sni_hostname", None)
- timeout = timeouts.get("connect", None)
-
- with self._connect_lock:
- if self._connection is None:
- try:
- # Connect to the proxy
- kwargs = {
- "host": self._proxy_origin.host.decode("ascii"),
- "port": self._proxy_origin.port,
- "timeout": timeout,
- }
- with Trace("connect_tcp", logger, request, kwargs) as trace:
- stream = self._network_backend.connect_tcp(**kwargs)
- trace.return_value = stream
-
- # Connect to the remote host using socks5
- kwargs = {
- "stream": stream,
- "host": self._remote_origin.host.decode("ascii"),
- "port": self._remote_origin.port,
- "auth": self._proxy_auth,
- }
- with Trace(
- "setup_socks5_connection", logger, request, kwargs
- ) as trace:
- _init_socks5_connection(**kwargs)
- trace.return_value = stream
-
- # Upgrade the stream to SSL
- if self._remote_origin.scheme == b"https":
- ssl_context = (
- default_ssl_context()
- if self._ssl_context is None
- else self._ssl_context
- )
- alpn_protocols = (
- ["http/1.1", "h2"] if self._http2 else ["http/1.1"]
- )
- ssl_context.set_alpn_protocols(alpn_protocols)
-
- kwargs = {
- "ssl_context": ssl_context,
- "server_hostname": sni_hostname
- or self._remote_origin.host.decode("ascii"),
- "timeout": timeout,
- }
- with Trace("start_tls", logger, request, kwargs) as trace:
- stream = stream.start_tls(**kwargs)
- trace.return_value = stream
-
- # Determine if we should be using HTTP/1.1 or HTTP/2
- ssl_object = stream.get_extra_info("ssl_object")
- http2_negotiated = (
- ssl_object is not None
- and ssl_object.selected_alpn_protocol() == "h2"
- )
-
- # Create the HTTP/1.1 or HTTP/2 connection
- if http2_negotiated or (
- self._http2 and not self._http1
- ): # pragma: nocover
- from .http2 import HTTP2Connection
-
- self._connection = HTTP2Connection(
- origin=self._remote_origin,
- stream=stream,
- keepalive_expiry=self._keepalive_expiry,
- )
- else:
- self._connection = HTTP11Connection(
- origin=self._remote_origin,
- stream=stream,
- keepalive_expiry=self._keepalive_expiry,
- )
- except Exception as exc:
- self._connect_failed = True
- raise exc
- elif not self._connection.is_available(): # pragma: nocover
- raise ConnectionNotAvailable()
-
- return self._connection.handle_request(request)
-
- def can_handle_request(self, origin: Origin) -> bool:
- return origin == self._remote_origin
-
- def close(self) -> None:
- if self._connection is not None:
- self._connection.close()
-
- def is_available(self) -> bool:
- if self._connection is None: # pragma: nocover
- # If HTTP/2 support is enabled, and the resulting connection could
- # end up as HTTP/2 then we should indicate the connection as being
- # available to service multiple requests.
- return (
- self._http2
- and (self._remote_origin.scheme == b"https" or not self._http1)
- and not self._connect_failed
- )
- return self._connection.is_available()
-
- def has_expired(self) -> bool:
- if self._connection is None: # pragma: nocover
- return self._connect_failed
- return self._connection.has_expired()
-
- def is_idle(self) -> bool:
- if self._connection is None: # pragma: nocover
- return self._connect_failed
- return self._connection.is_idle()
-
- def is_closed(self) -> bool:
- if self._connection is None: # pragma: nocover
- return self._connect_failed
- return self._connection.is_closed()
-
- def info(self) -> str:
- if self._connection is None: # pragma: nocover
- return "CONNECTION FAILED" if self._connect_failed else "CONNECTING"
- return self._connection.info()
-
- def __repr__(self) -> str:
- return f"<{self.__class__.__name__} [{self.info()}]>"
diff --git a/contrib/python/httpcore/httpcore/_synchronization.py b/contrib/python/httpcore/httpcore/_synchronization.py
deleted file mode 100644
index 2ecc9e9c363..00000000000
--- a/contrib/python/httpcore/httpcore/_synchronization.py
+++ /dev/null
@@ -1,318 +0,0 @@
-from __future__ import annotations
-
-import threading
-import types
-
-from ._exceptions import ExceptionMapping, PoolTimeout, map_exceptions
-
-# Our async synchronization primatives use either 'anyio' or 'trio' depending
-# on if they're running under asyncio or trio.
-
-try:
- import trio
-except (ImportError, NotImplementedError): # pragma: nocover
- trio = None # type: ignore
-
-try:
- import anyio
-except ImportError: # pragma: nocover
- anyio = None # type: ignore
-
-
-def current_async_library() -> str:
- # Determine if we're running under trio or asyncio.
- # See https://sniffio.readthedocs.io/en/latest/
- try:
- import sniffio
- except ImportError: # pragma: nocover
- environment = "asyncio"
- else:
- environment = sniffio.current_async_library()
-
- if environment not in ("asyncio", "trio"): # pragma: nocover
- raise RuntimeError("Running under an unsupported async environment.")
-
- if environment == "asyncio" and anyio is None: # pragma: nocover
- raise RuntimeError(
- "Running with asyncio requires installation of 'httpcore[asyncio]'."
- )
-
- if environment == "trio" and trio is None: # pragma: nocover
- raise RuntimeError(
- "Running with trio requires installation of 'httpcore[trio]'."
- )
-
- return environment
-
-
-class AsyncLock:
- """
- This is a standard lock.
-
- In the sync case `Lock` provides thread locking.
- In the async case `AsyncLock` provides async locking.
- """
-
- def __init__(self) -> None:
- self._backend = ""
-
- def setup(self) -> None:
- """
- Detect if we're running under 'asyncio' or 'trio' and create
- a lock with the correct implementation.
- """
- self._backend = current_async_library()
- if self._backend == "trio":
- self._trio_lock = trio.Lock()
- elif self._backend == "asyncio":
- self._anyio_lock = anyio.Lock()
-
- async def __aenter__(self) -> AsyncLock:
- if not self._backend:
- self.setup()
-
- if self._backend == "trio":
- await self._trio_lock.acquire()
- elif self._backend == "asyncio":
- await self._anyio_lock.acquire()
-
- return self
-
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- if self._backend == "trio":
- self._trio_lock.release()
- elif self._backend == "asyncio":
- self._anyio_lock.release()
-
-
-class AsyncThreadLock:
- """
- This is a threading-only lock for no-I/O contexts.
-
- In the sync case `ThreadLock` provides thread locking.
- In the async case `AsyncThreadLock` is a no-op.
- """
-
- def __enter__(self) -> AsyncThreadLock:
- return self
-
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- pass
-
-
-class AsyncEvent:
- def __init__(self) -> None:
- self._backend = ""
-
- def setup(self) -> None:
- """
- Detect if we're running under 'asyncio' or 'trio' and create
- a lock with the correct implementation.
- """
- self._backend = current_async_library()
- if self._backend == "trio":
- self._trio_event = trio.Event()
- elif self._backend == "asyncio":
- self._anyio_event = anyio.Event()
-
- def set(self) -> None:
- if not self._backend:
- self.setup()
-
- if self._backend == "trio":
- self._trio_event.set()
- elif self._backend == "asyncio":
- self._anyio_event.set()
-
- async def wait(self, timeout: float | None = None) -> None:
- if not self._backend:
- self.setup()
-
- if self._backend == "trio":
- trio_exc_map: ExceptionMapping = {trio.TooSlowError: PoolTimeout}
- timeout_or_inf = float("inf") if timeout is None else timeout
- with map_exceptions(trio_exc_map):
- with trio.fail_after(timeout_or_inf):
- await self._trio_event.wait()
- elif self._backend == "asyncio":
- anyio_exc_map: ExceptionMapping = {TimeoutError: PoolTimeout}
- with map_exceptions(anyio_exc_map):
- with anyio.fail_after(timeout):
- await self._anyio_event.wait()
-
-
-class AsyncSemaphore:
- def __init__(self, bound: int) -> None:
- self._bound = bound
- self._backend = ""
-
- def setup(self) -> None:
- """
- Detect if we're running under 'asyncio' or 'trio' and create
- a semaphore with the correct implementation.
- """
- self._backend = current_async_library()
- if self._backend == "trio":
- self._trio_semaphore = trio.Semaphore(
- initial_value=self._bound, max_value=self._bound
- )
- elif self._backend == "asyncio":
- self._anyio_semaphore = anyio.Semaphore(
- initial_value=self._bound, max_value=self._bound
- )
-
- async def acquire(self) -> None:
- if not self._backend:
- self.setup()
-
- if self._backend == "trio":
- await self._trio_semaphore.acquire()
- elif self._backend == "asyncio":
- await self._anyio_semaphore.acquire()
-
- async def release(self) -> None:
- if self._backend == "trio":
- self._trio_semaphore.release()
- elif self._backend == "asyncio":
- self._anyio_semaphore.release()
-
-
-class AsyncShieldCancellation:
- # For certain portions of our codebase where we're dealing with
- # closing connections during exception handling we want to shield
- # the operation from being cancelled.
- #
- # with AsyncShieldCancellation():
- # ... # clean-up operations, shielded from cancellation.
-
- def __init__(self) -> None:
- """
- Detect if we're running under 'asyncio' or 'trio' and create
- a shielded scope with the correct implementation.
- """
- self._backend = current_async_library()
-
- if self._backend == "trio":
- self._trio_shield = trio.CancelScope(shield=True)
- elif self._backend == "asyncio":
- self._anyio_shield = anyio.CancelScope(shield=True)
-
- def __enter__(self) -> AsyncShieldCancellation:
- if self._backend == "trio":
- self._trio_shield.__enter__()
- elif self._backend == "asyncio":
- self._anyio_shield.__enter__()
- return self
-
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- if self._backend == "trio":
- self._trio_shield.__exit__(exc_type, exc_value, traceback)
- elif self._backend == "asyncio":
- self._anyio_shield.__exit__(exc_type, exc_value, traceback)
-
-
-# Our thread-based synchronization primitives...
-
-
-class Lock:
- """
- This is a standard lock.
-
- In the sync case `Lock` provides thread locking.
- In the async case `AsyncLock` provides async locking.
- """
-
- def __init__(self) -> None:
- self._lock = threading.Lock()
-
- def __enter__(self) -> Lock:
- self._lock.acquire()
- return self
-
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- self._lock.release()
-
-
-class ThreadLock:
- """
- This is a threading-only lock for no-I/O contexts.
-
- In the sync case `ThreadLock` provides thread locking.
- In the async case `AsyncThreadLock` is a no-op.
- """
-
- def __init__(self) -> None:
- self._lock = threading.Lock()
-
- def __enter__(self) -> ThreadLock:
- self._lock.acquire()
- return self
-
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- self._lock.release()
-
-
-class Event:
- def __init__(self) -> None:
- self._event = threading.Event()
-
- def set(self) -> None:
- self._event.set()
-
- def wait(self, timeout: float | None = None) -> None:
- if timeout == float("inf"): # pragma: no cover
- timeout = None
- if not self._event.wait(timeout=timeout):
- raise PoolTimeout() # pragma: nocover
-
-
-class Semaphore:
- def __init__(self, bound: int) -> None:
- self._semaphore = threading.Semaphore(value=bound)
-
- def acquire(self) -> None:
- self._semaphore.acquire()
-
- def release(self) -> None:
- self._semaphore.release()
-
-
-class ShieldCancellation:
- # Thread-synchronous codebases don't support cancellation semantics.
- # We have this class because we need to mirror the async and sync
- # cases within our package, but it's just a no-op.
- def __enter__(self) -> ShieldCancellation:
- return self
-
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- pass
diff --git a/contrib/python/httpcore/httpcore/_trace.py b/contrib/python/httpcore/httpcore/_trace.py
deleted file mode 100644
index 5f1cd7c4782..00000000000
--- a/contrib/python/httpcore/httpcore/_trace.py
+++ /dev/null
@@ -1,107 +0,0 @@
-from __future__ import annotations
-
-import inspect
-import logging
-import types
-import typing
-
-from ._models import Request
-
-
-class Trace:
- def __init__(
- self,
- name: str,
- logger: logging.Logger,
- request: Request | None = None,
- kwargs: dict[str, typing.Any] | None = None,
- ) -> None:
- self.name = name
- self.logger = logger
- self.trace_extension = (
- None if request is None else request.extensions.get("trace")
- )
- self.debug = self.logger.isEnabledFor(logging.DEBUG)
- self.kwargs = kwargs or {}
- self.return_value: typing.Any = None
- self.should_trace = self.debug or self.trace_extension is not None
- self.prefix = self.logger.name.split(".")[-1]
-
- def trace(self, name: str, info: dict[str, typing.Any]) -> None:
- if self.trace_extension is not None:
- prefix_and_name = f"{self.prefix}.{name}"
- ret = self.trace_extension(prefix_and_name, info)
- if inspect.iscoroutine(ret): # pragma: no cover
- raise TypeError(
- "If you are using a synchronous interface, "
- "the callback of the `trace` extension should "
- "be a normal function instead of an asynchronous function."
- )
-
- if self.debug:
- if not info or "return_value" in info and info["return_value"] is None:
- message = name
- else:
- args = " ".join([f"{key}={value!r}" for key, value in info.items()])
- message = f"{name} {args}"
- self.logger.debug(message)
-
- def __enter__(self) -> Trace:
- if self.should_trace:
- info = self.kwargs
- self.trace(f"{self.name}.started", info)
- return self
-
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- if self.should_trace:
- if exc_value is None:
- info = {"return_value": self.return_value}
- self.trace(f"{self.name}.complete", info)
- else:
- info = {"exception": exc_value}
- self.trace(f"{self.name}.failed", info)
-
- async def atrace(self, name: str, info: dict[str, typing.Any]) -> None:
- if self.trace_extension is not None:
- prefix_and_name = f"{self.prefix}.{name}"
- coro = self.trace_extension(prefix_and_name, info)
- if not inspect.iscoroutine(coro): # pragma: no cover
- raise TypeError(
- "If you're using an asynchronous interface, "
- "the callback of the `trace` extension should "
- "be an asynchronous function rather than a normal function."
- )
- await coro
-
- if self.debug:
- if not info or "return_value" in info and info["return_value"] is None:
- message = name
- else:
- args = " ".join([f"{key}={value!r}" for key, value in info.items()])
- message = f"{name} {args}"
- self.logger.debug(message)
-
- async def __aenter__(self) -> Trace:
- if self.should_trace:
- info = self.kwargs
- await self.atrace(f"{self.name}.started", info)
- return self
-
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- if self.should_trace:
- if exc_value is None:
- info = {"return_value": self.return_value}
- await self.atrace(f"{self.name}.complete", info)
- else:
- info = {"exception": exc_value}
- await self.atrace(f"{self.name}.failed", info)
diff --git a/contrib/python/httpcore/httpcore/_utils.py b/contrib/python/httpcore/httpcore/_utils.py
deleted file mode 100644
index c44ff93cb2f..00000000000
--- a/contrib/python/httpcore/httpcore/_utils.py
+++ /dev/null
@@ -1,37 +0,0 @@
-from __future__ import annotations
-
-import select
-import socket
-import sys
-
-
-def is_socket_readable(sock: socket.socket | None) -> bool:
- """
- Return whether a socket, as identifed by its file descriptor, is readable.
- "A socket is readable" means that the read buffer isn't empty, i.e. that calling
- .recv() on it would immediately return some data.
- """
- # NOTE: we want check for readability without actually attempting to read, because
- # we don't want to block forever if it's not readable.
-
- # In the case that the socket no longer exists, or cannot return a file
- # descriptor, we treat it as being readable, as if it the next read operation
- # on it is ready to return the terminating `b""`.
- sock_fd = None if sock is None else sock.fileno()
- if sock_fd is None or sock_fd < 0: # pragma: nocover
- return True
-
- # The implementation below was stolen from:
- # https://github.com/python-trio/trio/blob/20ee2b1b7376db637435d80e266212a35837ddcc/trio/_socket.py#L471-L478
- # See also: https://github.com/encode/httpcore/pull/193#issuecomment-703129316
-
- # Use select.select on Windows, and when poll is unavailable and select.poll
- # everywhere else. (E.g. When eventlet is in use. See #327)
- if (
- sys.platform == "win32" or getattr(select, "poll", None) is None
- ): # pragma: nocover
- rready, _, _ = select.select([sock_fd], [], [], 0)
- return bool(rready)
- p = select.poll()
- p.register(sock_fd, select.POLLIN)
- return bool(p.poll(0))
diff --git a/contrib/python/httpcore/httpcore/py.typed b/contrib/python/httpcore/httpcore/py.typed
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/contrib/python/httpcore/httpcore/py.typed
+++ /dev/null
diff --git a/contrib/python/httpcore/ya.make b/contrib/python/httpcore/ya.make
deleted file mode 100644
index 6d4f3507cab..00000000000
--- a/contrib/python/httpcore/ya.make
+++ /dev/null
@@ -1,66 +0,0 @@
-# Generated by devtools/yamaker (pypi).
-
-PY3_LIBRARY()
-
-VERSION(1.0.7)
-
-LICENSE(BSD-3-Clause)
-
-PEERDIR(
- contrib/python/certifi
- contrib/python/h11
-)
-
-NO_LINT()
-
-NO_CHECK_IMPORTS(
- httpcore._async.http2
- httpcore._async.socks_proxy
- httpcore._backends.trio
- httpcore._sync.http2
- httpcore._sync.socks_proxy
-)
-
-PY_SRCS(
- TOP_LEVEL
- httpcore/__init__.py
- httpcore/_api.py
- httpcore/_async/__init__.py
- httpcore/_async/connection.py
- httpcore/_async/connection_pool.py
- httpcore/_async/http11.py
- httpcore/_async/http2.py
- httpcore/_async/http_proxy.py
- httpcore/_async/interfaces.py
- httpcore/_async/socks_proxy.py
- httpcore/_backends/__init__.py
- httpcore/_backends/anyio.py
- httpcore/_backends/auto.py
- httpcore/_backends/base.py
- httpcore/_backends/mock.py
- httpcore/_backends/sync.py
- httpcore/_backends/trio.py
- httpcore/_exceptions.py
- httpcore/_models.py
- httpcore/_ssl.py
- httpcore/_sync/__init__.py
- httpcore/_sync/connection.py
- httpcore/_sync/connection_pool.py
- httpcore/_sync/http11.py
- httpcore/_sync/http2.py
- httpcore/_sync/http_proxy.py
- httpcore/_sync/interfaces.py
- httpcore/_sync/socks_proxy.py
- httpcore/_synchronization.py
- httpcore/_trace.py
- httpcore/_utils.py
-)
-
-RESOURCE_FILES(
- PREFIX contrib/python/httpcore/
- .dist-info/METADATA
- .dist-info/top_level.txt
- httpcore/py.typed
-)
-
-END()