aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/web/_newclient.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/web/_newclient.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/web/_newclient.py')
-rw-r--r--contrib/python/Twisted/py3/twisted/web/_newclient.py1727
1 files changed, 1727 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/web/_newclient.py b/contrib/python/Twisted/py3/twisted/web/_newclient.py
new file mode 100644
index 0000000000..6fd1ac21ba
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/web/_newclient.py
@@ -0,0 +1,1727 @@
+# -*- test-case-name: twisted.web.test.test_newclient -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+An U{HTTP 1.1<http://www.w3.org/Protocols/rfc2616/rfc2616.html>} client.
+
+The way to use the functionality provided by this module is to:
+
+ - Connect a L{HTTP11ClientProtocol} to an HTTP server
+ - Create a L{Request} with the appropriate data
+ - Pass the request to L{HTTP11ClientProtocol.request}
+ - The returned Deferred will fire with a L{Response} object
+ - Create a L{IProtocol} provider which can handle the response body
+ - Connect it to the response with L{Response.deliverBody}
+ - When the protocol's C{connectionLost} method is called, the response is
+ complete. See L{Response.deliverBody} for details.
+
+Various other classes in this module support this usage:
+
+ - HTTPParser is the basic HTTP parser. It can handle the parts of HTTP which
+ are symmetric between requests and responses.
+
+ - HTTPClientParser extends HTTPParser to handle response-specific parts of
+ HTTP. One instance is created for each request to parse the corresponding
+ response.
+"""
+
+import re
+
+from zope.interface import implementer
+
+from twisted.internet.defer import (
+ CancelledError,
+ Deferred,
+ fail,
+ maybeDeferred,
+ succeed,
+)
+from twisted.internet.error import ConnectionDone
+from twisted.internet.interfaces import IConsumer, IPushProducer
+from twisted.internet.protocol import Protocol
+from twisted.logger import Logger
+from twisted.protocols.basic import LineReceiver
+from twisted.python.compat import networkString
+from twisted.python.components import proxyForInterface
+from twisted.python.failure import Failure
+from twisted.python.reflect import fullyQualifiedName
+from twisted.web.http import (
+ NO_CONTENT,
+ NOT_MODIFIED,
+ PotentialDataLoss,
+ _ChunkedTransferDecoder,
+ _DataLoss,
+ _IdentityTransferDecoder,
+)
+from twisted.web.http_headers import Headers
+from twisted.web.iweb import UNKNOWN_LENGTH, IClientRequest, IResponse
+
+# States HTTPParser can be in
+STATUS = "STATUS"
+HEADER = "HEADER"
+BODY = "BODY"
+DONE = "DONE"
+_moduleLog = Logger()
+
+
+class BadHeaders(Exception):
+ """
+ Headers passed to L{Request} were in some way invalid.
+ """
+
+
+class ExcessWrite(Exception):
+ """
+ The body L{IBodyProducer} for a request tried to write data after
+ indicating it had finished writing data.
+ """
+
+
+class ParseError(Exception):
+ """
+ Some received data could not be parsed.
+
+ @ivar data: The string which could not be parsed.
+ """
+
+ def __init__(self, reason, data):
+ Exception.__init__(self, reason, data)
+ self.data = data
+
+
+class BadResponseVersion(ParseError):
+ """
+ The version string in a status line was unparsable.
+ """
+
+
+class _WrapperException(Exception):
+ """
+ L{_WrapperException} is the base exception type for exceptions which
+ include one or more other exceptions as the low-level causes.
+
+ @ivar reasons: A L{list} of one or more L{Failure} instances encountered
+ during an HTTP request. See subclass documentation for more details.
+ """
+
+ def __init__(self, reasons):
+ Exception.__init__(self, reasons)
+ self.reasons = reasons
+
+
+class RequestGenerationFailed(_WrapperException):
+ """
+ There was an error while creating the bytes which make up a request.
+
+ @ivar reasons: A C{list} of one or more L{Failure} instances giving the
+ reasons the request generation was considered to have failed.
+ """
+
+
+class RequestTransmissionFailed(_WrapperException):
+ """
+ There was an error while sending the bytes which make up a request.
+
+ @ivar reasons: A C{list} of one or more L{Failure} instances giving the
+ reasons the request transmission was considered to have failed.
+ """
+
+
+class ConnectionAborted(Exception):
+ """
+ The connection was explicitly aborted by application code.
+ """
+
+
+class WrongBodyLength(Exception):
+ """
+ An L{IBodyProducer} declared the number of bytes it was going to
+ produce (via its C{length} attribute) and then produced a different number
+ of bytes.
+ """
+
+
+class ResponseDone(Exception):
+ """
+ L{ResponseDone} may be passed to L{IProtocol.connectionLost} on the
+ protocol passed to L{Response.deliverBody} and indicates that the entire
+ response has been delivered.
+ """
+
+
+class ResponseFailed(_WrapperException):
+ """
+ L{ResponseFailed} indicates that all of the response to a request was not
+ received for some reason.
+
+ @ivar reasons: A C{list} of one or more L{Failure} instances giving the
+ reasons the response was considered to have failed.
+
+ @ivar response: If specified, the L{Response} received from the server (and
+ in particular the status code and the headers).
+ """
+
+ def __init__(self, reasons, response=None):
+ _WrapperException.__init__(self, reasons)
+ self.response = response
+
+
+class ResponseNeverReceived(ResponseFailed):
+ """
+ A L{ResponseFailed} that knows no response bytes at all have been received.
+ """
+
+
+class RequestNotSent(Exception):
+ """
+ L{RequestNotSent} indicates that an attempt was made to issue a request but
+ for reasons unrelated to the details of the request itself, the request
+ could not be sent. For example, this may indicate that an attempt was made
+ to send a request using a protocol which is no longer connected to a
+ server.
+ """
+
+
+def _callAppFunction(function):
+ """
+ Call C{function}. If it raises an exception, log it with a minimal
+ description of the source.
+
+ @return: L{None}
+ """
+ try:
+ function()
+ except BaseException:
+ _moduleLog.failure(
+ "Unexpected exception from {name}", name=fullyQualifiedName(function)
+ )
+
+
+class HTTPParser(LineReceiver):
+ """
+ L{HTTPParser} handles the parsing side of HTTP processing. With a suitable
+ subclass, it can parse either the client side or the server side of the
+ connection.
+
+ @ivar headers: All of the non-connection control message headers yet
+ received.
+
+ @ivar state: State indicator for the response parsing state machine. One
+ of C{STATUS}, C{HEADER}, C{BODY}, C{DONE}.
+
+ @ivar _partialHeader: L{None} or a C{list} of the lines of a multiline
+ header while that header is being received.
+ """
+
+ # NOTE: According to HTTP spec, we're supposed to eat the
+ # 'Proxy-Authenticate' and 'Proxy-Authorization' headers also, but that
+ # doesn't sound like a good idea to me, because it makes it impossible to
+ # have a non-authenticating transparent proxy in front of an authenticating
+ # proxy. An authenticating proxy can eat them itself. -jknight
+ #
+ # Further, quoting
+ # http://homepages.tesco.net/J.deBoynePollard/FGA/web-proxy-connection-header.html
+ # regarding the 'Proxy-Connection' header:
+ #
+ # The Proxy-Connection: header is a mistake in how some web browsers
+ # use HTTP. Its name is the result of a false analogy. It is not a
+ # standard part of the protocol. There is a different standard
+ # protocol mechanism for doing what it does. And its existence
+ # imposes a requirement upon HTTP servers such that no proxy HTTP
+ # server can be standards-conforming in practice.
+ #
+ # -exarkun
+
+ # Some servers (like http://news.ycombinator.com/) return status lines and
+ # HTTP headers delimited by \n instead of \r\n.
+ delimiter = b"\n"
+
+ CONNECTION_CONTROL_HEADERS = {
+ b"content-length",
+ b"connection",
+ b"keep-alive",
+ b"te",
+ b"trailers",
+ b"transfer-encoding",
+ b"upgrade",
+ b"proxy-connection",
+ }
+
+ def connectionMade(self):
+ self.headers = Headers()
+ self.connHeaders = Headers()
+ self.state = STATUS
+ self._partialHeader = None
+
+ def switchToBodyMode(self, decoder):
+ """
+ Switch to body parsing mode - interpret any more bytes delivered as
+ part of the message body and deliver them to the given decoder.
+ """
+ if self.state == BODY:
+ raise RuntimeError("already in body mode")
+
+ self.bodyDecoder = decoder
+ self.state = BODY
+ self.setRawMode()
+
+ def lineReceived(self, line):
+ """
+ Handle one line from a response.
+ """
+ # Handle the normal CR LF case.
+ if line[-1:] == b"\r":
+ line = line[:-1]
+
+ if self.state == STATUS:
+ self.statusReceived(line)
+ self.state = HEADER
+ elif self.state == HEADER:
+ if not line or line[0] not in b" \t":
+ if self._partialHeader is not None:
+ header = b"".join(self._partialHeader)
+ name, value = header.split(b":", 1)
+ value = value.strip()
+ self.headerReceived(name, value)
+ if not line:
+ # Empty line means the header section is over.
+ self.allHeadersReceived()
+ else:
+ # Line not beginning with LWS is another header.
+ self._partialHeader = [line]
+ else:
+ # A line beginning with LWS is a continuation of a header
+ # begun on a previous line.
+ self._partialHeader.append(line)
+
+ def rawDataReceived(self, data):
+ """
+ Pass data from the message body to the body decoder object.
+ """
+ self.bodyDecoder.dataReceived(data)
+
+ def isConnectionControlHeader(self, name):
+ """
+ Return C{True} if the given lower-cased name is the name of a
+ connection control header (rather than an entity header).
+
+ According to RFC 2616, section 14.10, the tokens in the Connection
+ header are probably relevant here. However, I am not sure what the
+ practical consequences of either implementing or ignoring that are.
+ So I leave it unimplemented for the time being.
+ """
+ return name in self.CONNECTION_CONTROL_HEADERS
+
+ def statusReceived(self, status):
+ """
+ Callback invoked whenever the first line of a new message is received.
+ Override this.
+
+ @param status: The first line of an HTTP request or response message
+ without trailing I{CR LF}.
+ @type status: C{bytes}
+ """
+
+ def headerReceived(self, name, value):
+ """
+ Store the given header in C{self.headers}.
+ """
+ name = name.lower()
+ if self.isConnectionControlHeader(name):
+ headers = self.connHeaders
+ else:
+ headers = self.headers
+ headers.addRawHeader(name, value)
+
+ def allHeadersReceived(self):
+ """
+ Callback invoked after the last header is passed to C{headerReceived}.
+ Override this to change to the C{BODY} or C{DONE} state.
+ """
+ self.switchToBodyMode(None)
+
+
+class HTTPClientParser(HTTPParser):
+ """
+ An HTTP parser which only handles HTTP responses.
+
+ @ivar request: The request with which the expected response is associated.
+ @type request: L{Request}
+
+ @ivar NO_BODY_CODES: A C{set} of response codes which B{MUST NOT} have a
+ body.
+
+ @ivar finisher: A callable to invoke when this response is fully parsed.
+
+ @ivar _responseDeferred: A L{Deferred} which will be called back with the
+ response when all headers in the response have been received.
+ Thereafter, L{None}.
+
+ @ivar _everReceivedData: C{True} if any bytes have been received.
+ """
+
+ NO_BODY_CODES = {NO_CONTENT, NOT_MODIFIED}
+
+ _transferDecoders = {
+ b"chunked": _ChunkedTransferDecoder,
+ }
+
+ bodyDecoder = None
+ _log = Logger()
+
+ def __init__(self, request, finisher):
+ self.request = request
+ self.finisher = finisher
+ self._responseDeferred = Deferred()
+ self._everReceivedData = False
+
+ def dataReceived(self, data):
+ """
+ Override so that we know if any response has been received.
+ """
+ self._everReceivedData = True
+ HTTPParser.dataReceived(self, data)
+
+ def parseVersion(self, strversion):
+ """
+ Parse version strings of the form Protocol '/' Major '.' Minor. E.g.
+ b'HTTP/1.1'. Returns (protocol, major, minor). Will raise ValueError
+ on bad syntax.
+ """
+ try:
+ proto, strnumber = strversion.split(b"/")
+ major, minor = strnumber.split(b".")
+ major, minor = int(major), int(minor)
+ except ValueError as e:
+ raise BadResponseVersion(str(e), strversion)
+ if major < 0 or minor < 0:
+ raise BadResponseVersion("version may not be negative", strversion)
+ return (proto, major, minor)
+
+ def statusReceived(self, status):
+ """
+ Parse the status line into its components and create a response object
+ to keep track of this response's state.
+ """
+ parts = status.split(b" ", 2)
+ if len(parts) == 2:
+ # Some broken servers omit the required `phrase` portion of
+ # `status-line`. One such server identified as
+ # "cloudflare-nginx". Others fail to identify themselves
+ # entirely. Fill in an empty phrase for such cases.
+ version, codeBytes = parts
+ phrase = b""
+ elif len(parts) == 3:
+ version, codeBytes, phrase = parts
+ else:
+ raise ParseError("wrong number of parts", status)
+
+ try:
+ statusCode = int(codeBytes)
+ except ValueError:
+ raise ParseError("non-integer status code", status)
+
+ self.response = Response._construct(
+ self.parseVersion(version),
+ statusCode,
+ phrase,
+ self.headers,
+ self.transport,
+ self.request,
+ )
+
+ def _finished(self, rest):
+ """
+ Called to indicate that an entire response has been received. No more
+ bytes will be interpreted by this L{HTTPClientParser}. Extra bytes are
+ passed up and the state of this L{HTTPClientParser} is set to I{DONE}.
+
+ @param rest: A C{bytes} giving any extra bytes delivered to this
+ L{HTTPClientParser} which are not part of the response being
+ parsed.
+ """
+ self.state = DONE
+ self.finisher(rest)
+
+ def isConnectionControlHeader(self, name):
+ """
+ Content-Length in the response to a HEAD request is an entity header,
+ not a connection control header.
+ """
+ if self.request.method == b"HEAD" and name == b"content-length":
+ return False
+ return HTTPParser.isConnectionControlHeader(self, name)
+
+ def allHeadersReceived(self):
+ """
+ Figure out how long the response body is going to be by examining
+ headers and stuff.
+ """
+ if 100 <= self.response.code < 200:
+ # RFC 7231 Section 6.2 says that if we receive a 1XX status code
+ # and aren't expecting it, we MAY ignore it. That's what we're
+ # going to do. We reset the parser here, but we leave
+ # _everReceivedData in its True state because we have, in fact,
+ # received data.
+ self._log.info(
+ "Ignoring unexpected {code} response", code=self.response.code
+ )
+ self.connectionMade()
+ del self.response
+ return
+
+ if self.response.code in self.NO_BODY_CODES or self.request.method == b"HEAD":
+ self.response.length = 0
+ # The order of the next two lines might be of interest when adding
+ # support for pipelining.
+ self._finished(self.clearLineBuffer())
+ self.response._bodyDataFinished()
+ else:
+ transferEncodingHeaders = self.connHeaders.getRawHeaders(
+ b"transfer-encoding"
+ )
+ if transferEncodingHeaders:
+ # This could be a KeyError. However, that would mean we do not
+ # know how to decode the response body, so failing the request
+ # is as good a behavior as any. Perhaps someday we will want
+ # to normalize/document/test this specifically, but failing
+ # seems fine to me for now.
+ transferDecoder = self._transferDecoders[
+ transferEncodingHeaders[0].lower()
+ ]
+
+ # If anyone ever invents a transfer encoding other than
+ # chunked (yea right), and that transfer encoding can predict
+ # the length of the response body, it might be sensible to
+ # allow the transfer decoder to set the response object's
+ # length attribute.
+ else:
+ contentLengthHeaders = self.connHeaders.getRawHeaders(b"content-length")
+ if contentLengthHeaders is None:
+ contentLength = None
+ elif len(contentLengthHeaders) == 1:
+ contentLength = int(contentLengthHeaders[0])
+ self.response.length = contentLength
+ else:
+ # "HTTP Message Splitting" or "HTTP Response Smuggling"
+ # potentially happening. Or it's just a buggy server.
+ raise ValueError(
+ "Too many Content-Length headers; " "response is invalid"
+ )
+
+ if contentLength == 0:
+ self._finished(self.clearLineBuffer())
+ transferDecoder = None
+ else:
+ transferDecoder = lambda x, y: _IdentityTransferDecoder(
+ contentLength, x, y
+ )
+
+ if transferDecoder is None:
+ self.response._bodyDataFinished()
+ else:
+ # Make sure as little data as possible from the response body
+ # gets delivered to the response object until the response
+ # object actually indicates it is ready to handle bytes
+ # (probably because an application gave it a way to interpret
+ # them).
+ self.transport.pauseProducing()
+ self.switchToBodyMode(
+ transferDecoder(self.response._bodyDataReceived, self._finished)
+ )
+
+ # This must be last. If it were first, then application code might
+ # change some state (for example, registering a protocol to receive the
+ # response body). Then the pauseProducing above would be wrong since
+ # the response is ready for bytes and nothing else would ever resume
+ # the transport.
+ self._responseDeferred.callback(self.response)
+ del self._responseDeferred
+
+ def connectionLost(self, reason):
+ if self.bodyDecoder is not None:
+ try:
+ try:
+ self.bodyDecoder.noMoreData()
+ except PotentialDataLoss:
+ self.response._bodyDataFinished(Failure())
+ except _DataLoss:
+ self.response._bodyDataFinished(
+ Failure(ResponseFailed([reason, Failure()], self.response))
+ )
+ else:
+ self.response._bodyDataFinished()
+ except BaseException:
+ # Handle exceptions from both the except suites and the else
+ # suite. Those functions really shouldn't raise exceptions,
+ # but maybe there's some buggy application code somewhere
+ # making things difficult.
+ self._log.failure("")
+ elif self.state != DONE:
+ if self._everReceivedData:
+ exceptionClass = ResponseFailed
+ else:
+ exceptionClass = ResponseNeverReceived
+ self._responseDeferred.errback(Failure(exceptionClass([reason])))
+ del self._responseDeferred
+
+
+_VALID_METHOD = re.compile(
+ rb"\A[%s]+\Z"
+ % (
+ bytes().join(
+ (
+ b"!",
+ b"#",
+ b"$",
+ b"%",
+ b"&",
+ b"'",
+ b"*",
+ b"+",
+ b"-",
+ b".",
+ b"^",
+ b"_",
+ b"`",
+ b"|",
+ b"~",
+ b"\x30-\x39",
+ b"\x41-\x5a",
+ b"\x61-\x7A",
+ ),
+ ),
+ ),
+)
+
+
+def _ensureValidMethod(method):
+ """
+ An HTTP method is an HTTP token, which consists of any visible
+ ASCII character that is not a delimiter (i.e. one of
+ C{"(),/:;<=>?@[\\]{}}.)
+
+ @param method: the method to check
+ @type method: L{bytes}
+
+ @return: the method if it is valid
+ @rtype: L{bytes}
+
+ @raise ValueError: if the method is not valid
+
+ @see: U{https://tools.ietf.org/html/rfc7230#section-3.1.1},
+ U{https://tools.ietf.org/html/rfc7230#section-3.2.6},
+ U{https://tools.ietf.org/html/rfc5234#appendix-B.1}
+ """
+ if _VALID_METHOD.match(method):
+ return method
+ raise ValueError(f"Invalid method {method!r}")
+
+
+_VALID_URI = re.compile(rb"\A[\x21-\x7e]+\Z")
+
+
+def _ensureValidURI(uri):
+ """
+ A valid URI cannot contain control characters (i.e., characters
+ between 0-32, inclusive and 127) or non-ASCII characters (i.e.,
+ characters with values between 128-255, inclusive).
+
+ @param uri: the URI to check
+ @type uri: L{bytes}
+
+ @return: the URI if it is valid
+ @rtype: L{bytes}
+
+ @raise ValueError: if the URI is not valid
+
+ @see: U{https://tools.ietf.org/html/rfc3986#section-3.3},
+ U{https://tools.ietf.org/html/rfc3986#appendix-A},
+ U{https://tools.ietf.org/html/rfc5234#appendix-B.1}
+ """
+ if _VALID_URI.match(uri):
+ return uri
+ raise ValueError(f"Invalid URI {uri!r}")
+
+
+@implementer(IClientRequest)
+class Request:
+ """
+ A L{Request} instance describes an HTTP request to be sent to an HTTP
+ server.
+
+ @ivar method: See L{__init__}.
+ @ivar uri: See L{__init__}.
+ @ivar headers: See L{__init__}.
+ @ivar bodyProducer: See L{__init__}.
+ @ivar persistent: See L{__init__}.
+
+ @ivar _parsedURI: Parsed I{URI} for the request, or L{None}.
+ @type _parsedURI: L{twisted.web.client.URI} or L{None}
+ """
+
+ _log = Logger()
+
+ def __init__(self, method, uri, headers, bodyProducer, persistent=False):
+ """
+ @param method: The HTTP method for this request, ex: b'GET', b'HEAD',
+ b'POST', etc.
+ @type method: L{bytes}
+
+ @param uri: The relative URI of the resource to request. For example,
+ C{b'/foo/bar?baz=quux'}.
+ @type uri: L{bytes}
+
+ @param headers: Headers to be sent to the server. It is important to
+ note that this object does not create any implicit headers. So it
+ is up to the HTTP Client to add required headers such as 'Host'.
+ @type headers: L{twisted.web.http_headers.Headers}
+
+ @param bodyProducer: L{None} or an L{IBodyProducer} provider which
+ produces the content body to send to the remote HTTP server.
+
+ @param persistent: Set to C{True} when you use HTTP persistent
+ connection, defaults to C{False}.
+ @type persistent: L{bool}
+ """
+ self.method = _ensureValidMethod(method)
+ self.uri = _ensureValidURI(uri)
+ self.headers = headers
+ self.bodyProducer = bodyProducer
+ self.persistent = persistent
+ self._parsedURI = None
+
+ @classmethod
+ def _construct(
+ cls, method, uri, headers, bodyProducer, persistent=False, parsedURI=None
+ ):
+ """
+ Private constructor.
+
+ @param method: See L{__init__}.
+ @param uri: See L{__init__}.
+ @param headers: See L{__init__}.
+ @param bodyProducer: See L{__init__}.
+ @param persistent: See L{__init__}.
+ @param parsedURI: See L{Request._parsedURI}.
+
+ @return: L{Request} instance.
+ """
+ request = cls(method, uri, headers, bodyProducer, persistent)
+ request._parsedURI = parsedURI
+ return request
+
+ @property
+ def absoluteURI(self):
+ """
+ The absolute URI of the request as C{bytes}, or L{None} if the
+ absolute URI cannot be determined.
+ """
+ return getattr(self._parsedURI, "toBytes", lambda: None)()
+
+ def _writeHeaders(self, transport, TEorCL):
+ hosts = self.headers.getRawHeaders(b"host", ())
+ if len(hosts) != 1:
+ raise BadHeaders("Exactly one Host header required")
+
+ # In the future, having the protocol version be a parameter to this
+ # method would probably be good. It would be nice if this method
+ # weren't limited to issuing HTTP/1.1 requests.
+ requestLines = []
+ requestLines.append(
+ b" ".join(
+ [
+ _ensureValidMethod(self.method),
+ _ensureValidURI(self.uri),
+ b"HTTP/1.1\r\n",
+ ]
+ ),
+ )
+ if not self.persistent:
+ requestLines.append(b"Connection: close\r\n")
+ if TEorCL is not None:
+ requestLines.append(TEorCL)
+ for name, values in self.headers.getAllRawHeaders():
+ requestLines.extend([name + b": " + v + b"\r\n" for v in values])
+ requestLines.append(b"\r\n")
+ transport.writeSequence(requestLines)
+
+ def _writeToBodyProducerChunked(self, transport):
+ """
+ Write this request to the given transport using chunked
+ transfer-encoding to frame the body.
+
+ @param transport: See L{writeTo}.
+ @return: See L{writeTo}.
+ """
+ self._writeHeaders(transport, b"Transfer-Encoding: chunked\r\n")
+ encoder = ChunkedEncoder(transport)
+ encoder.registerProducer(self.bodyProducer, True)
+ d = self.bodyProducer.startProducing(encoder)
+
+ def cbProduced(ignored):
+ encoder.unregisterProducer()
+
+ def ebProduced(err):
+ encoder._allowNoMoreWrites()
+ # Don't call the encoder's unregisterProducer because it will write
+ # a zero-length chunk. This would indicate to the server that the
+ # request body is complete. There was an error, though, so we
+ # don't want to do that.
+ transport.unregisterProducer()
+ return err
+
+ d.addCallbacks(cbProduced, ebProduced)
+ return d
+
+ def _writeToBodyProducerContentLength(self, transport):
+ """
+ Write this request to the given transport using content-length to frame
+ the body.
+
+ @param transport: See L{writeTo}.
+ @return: See L{writeTo}.
+ """
+ self._writeHeaders(
+ transport,
+ networkString("Content-Length: %d\r\n" % (self.bodyProducer.length,)),
+ )
+
+ # This Deferred is used to signal an error in the data written to the
+ # encoder below. It can only errback and it will only do so before too
+ # many bytes have been written to the encoder and before the producer
+ # Deferred fires.
+ finishedConsuming = Deferred()
+
+ # This makes sure the producer writes the correct number of bytes for
+ # the request body.
+ encoder = LengthEnforcingConsumer(
+ self.bodyProducer, transport, finishedConsuming
+ )
+
+ transport.registerProducer(self.bodyProducer, True)
+
+ finishedProducing = self.bodyProducer.startProducing(encoder)
+
+ def combine(consuming, producing):
+ # This Deferred is returned and will be fired when the first of
+ # consuming or producing fires. If it's cancelled, forward that
+ # cancellation to the producer.
+ def cancelConsuming(ign):
+ finishedProducing.cancel()
+
+ ultimate = Deferred(cancelConsuming)
+
+ # Keep track of what has happened so far. This initially
+ # contains None, then an integer uniquely identifying what
+ # sequence of events happened. See the callbacks and errbacks
+ # defined below for the meaning of each value.
+ state = [None]
+
+ def ebConsuming(err):
+ if state == [None]:
+ # The consuming Deferred failed first. This means the
+ # overall writeTo Deferred is going to errback now. The
+ # producing Deferred should not fire later (because the
+ # consumer should have called stopProducing on the
+ # producer), but if it does, a callback will be ignored
+ # and an errback will be logged.
+ state[0] = 1
+ ultimate.errback(err)
+ else:
+ # The consuming Deferred errbacked after the producing
+ # Deferred fired. This really shouldn't ever happen.
+ # If it does, I goofed. Log the error anyway, just so
+ # there's a chance someone might notice and complain.
+ self._log.failure(
+ "Buggy state machine in {request}/[{state}]: "
+ "ebConsuming called",
+ failure=err,
+ request=repr(self),
+ state=state[0],
+ )
+
+ def cbProducing(result):
+ if state == [None]:
+ # The producing Deferred succeeded first. Nothing will
+ # ever happen to the consuming Deferred. Tell the
+ # encoder we're done so it can check what the producer
+ # wrote and make sure it was right.
+ state[0] = 2
+ try:
+ encoder._noMoreWritesExpected()
+ except BaseException:
+ # Fail the overall writeTo Deferred - something the
+ # producer did was wrong.
+ ultimate.errback()
+ else:
+ # Success - succeed the overall writeTo Deferred.
+ ultimate.callback(None)
+ # Otherwise, the consuming Deferred already errbacked. The
+ # producing Deferred wasn't supposed to fire, but it did
+ # anyway. It's buggy, but there's not really anything to be
+ # done about it. Just ignore this result.
+
+ def ebProducing(err):
+ if state == [None]:
+ # The producing Deferred failed first. This means the
+ # overall writeTo Deferred is going to errback now.
+ # Tell the encoder that we're done so it knows to reject
+ # further writes from the producer (which should not
+ # happen, but the producer may be buggy).
+ state[0] = 3
+ encoder._allowNoMoreWrites()
+ ultimate.errback(err)
+ else:
+ # The producing Deferred failed after the consuming
+ # Deferred failed. It shouldn't have, so it's buggy.
+ # Log the exception in case anyone who can fix the code
+ # is watching.
+ self._log.failure("Producer is buggy", failure=err)
+
+ consuming.addErrback(ebConsuming)
+ producing.addCallbacks(cbProducing, ebProducing)
+
+ return ultimate
+
+ d = combine(finishedConsuming, finishedProducing)
+
+ def f(passthrough):
+ # Regardless of what happens with the overall Deferred, once it
+ # fires, the producer registered way up above the definition of
+ # combine should be unregistered.
+ transport.unregisterProducer()
+ return passthrough
+
+ d.addBoth(f)
+ return d
+
+ def _writeToEmptyBodyContentLength(self, transport):
+ """
+ Write this request to the given transport using content-length to frame
+ the (empty) body.
+
+ @param transport: See L{writeTo}.
+ @return: See L{writeTo}.
+ """
+ self._writeHeaders(transport, b"Content-Length: 0\r\n")
+ return succeed(None)
+
+ def writeTo(self, transport):
+ """
+ Format this L{Request} as an HTTP/1.1 request and write it to the given
+ transport. If bodyProducer is not None, it will be associated with an
+ L{IConsumer}.
+
+ @param transport: The transport to which to write.
+ @type transport: L{twisted.internet.interfaces.ITransport} provider
+
+ @return: A L{Deferred} which fires with L{None} when the request has
+ been completely written to the transport or with a L{Failure} if
+ there is any problem generating the request bytes.
+ """
+ if self.bodyProducer is None:
+ # If the method semantics anticipate a body, include a
+ # Content-Length even if it is 0.
+ # https://tools.ietf.org/html/rfc7230#section-3.3.2
+ if self.method in (b"PUT", b"POST"):
+ self._writeToEmptyBodyContentLength(transport)
+ else:
+ self._writeHeaders(transport, None)
+ elif self.bodyProducer.length is UNKNOWN_LENGTH:
+ return self._writeToBodyProducerChunked(transport)
+ else:
+ return self._writeToBodyProducerContentLength(transport)
+
+ def stopWriting(self):
+ """
+ Stop writing this request to the transport. This can only be called
+ after C{writeTo} and before the L{Deferred} returned by C{writeTo}
+ fires. It should cancel any asynchronous task started by C{writeTo}.
+ The L{Deferred} returned by C{writeTo} need not be fired if this method
+ is called.
+ """
+ # If bodyProducer is None, then the Deferred returned by writeTo has
+ # fired already and this method cannot be called.
+ _callAppFunction(self.bodyProducer.stopProducing)
+
+
+class LengthEnforcingConsumer:
+ """
+ An L{IConsumer} proxy which enforces an exact length requirement on the
+ total data written to it.
+
+ @ivar _length: The number of bytes remaining to be written.
+
+ @ivar _producer: The L{IBodyProducer} which is writing to this
+ consumer.
+
+ @ivar _consumer: The consumer to which at most C{_length} bytes will be
+ forwarded.
+
+ @ivar _finished: A L{Deferred} which will be fired with a L{Failure} if too
+ many bytes are written to this consumer.
+ """
+
+ def __init__(self, producer, consumer, finished):
+ self._length = producer.length
+ self._producer = producer
+ self._consumer = consumer
+ self._finished = finished
+
+ def _allowNoMoreWrites(self):
+ """
+ Indicate that no additional writes are allowed. Attempts to write
+ after calling this method will be met with an exception.
+ """
+ self._finished = None
+
+ def write(self, bytes):
+ """
+ Write C{bytes} to the underlying consumer unless
+ C{_noMoreWritesExpected} has been called or there are/have been too
+ many bytes.
+ """
+ if self._finished is None:
+ # No writes are supposed to happen any more. Try to convince the
+ # calling code to stop calling this method by calling its
+ # stopProducing method and then throwing an exception at it. This
+ # exception isn't documented as part of the API because you're
+ # never supposed to expect it: only buggy code will ever receive
+ # it.
+ self._producer.stopProducing()
+ raise ExcessWrite()
+
+ if len(bytes) <= self._length:
+ self._length -= len(bytes)
+ self._consumer.write(bytes)
+ else:
+ # No synchronous exception is raised in *this* error path because
+ # we still have _finished which we can use to report the error to a
+ # better place than the direct caller of this method (some
+ # arbitrary application code).
+ _callAppFunction(self._producer.stopProducing)
+ self._finished.errback(WrongBodyLength("too many bytes written"))
+ self._allowNoMoreWrites()
+
+ def _noMoreWritesExpected(self):
+ """
+ Called to indicate no more bytes will be written to this consumer.
+ Check to see that the correct number have been written.
+
+ @raise WrongBodyLength: If not enough bytes have been written.
+ """
+ if self._finished is not None:
+ self._allowNoMoreWrites()
+ if self._length:
+ raise WrongBodyLength("too few bytes written")
+
+
+def makeStatefulDispatcher(name, template):
+ """
+ Given a I{dispatch} name and a function, return a function which can be
+ used as a method and which, when called, will call another method defined
+ on the instance and return the result. The other method which is called is
+ determined by the value of the C{_state} attribute of the instance.
+
+ @param name: A string which is used to construct the name of the subsidiary
+ method to invoke. The subsidiary method is named like C{'_%s_%s' %
+ (name, _state)}.
+
+ @param template: A function object which is used to give the returned
+ function a docstring.
+
+ @return: The dispatcher function.
+ """
+
+ def dispatcher(self, *args, **kwargs):
+ func = getattr(self, "_" + name + "_" + self._state, None)
+ if func is None:
+ raise RuntimeError(f"{self!r} has no {name} method in state {self._state}")
+ return func(*args, **kwargs)
+
+ dispatcher.__doc__ = template.__doc__
+ return dispatcher
+
+
+# This proxy class is used only in the private constructor of the Response
+# class below, in order to prevent users relying on any property of the
+# concrete request object: they can only use what is provided by
+# IClientRequest.
+_ClientRequestProxy = proxyForInterface(IClientRequest)
+
+
+@implementer(IResponse)
+class Response:
+ """
+ A L{Response} instance describes an HTTP response received from an HTTP
+ server.
+
+ L{Response} should not be subclassed or instantiated.
+
+ @ivar _transport: See L{__init__}.
+
+ @ivar _bodyProtocol: The L{IProtocol} provider to which the body is
+ delivered. L{None} before one has been registered with
+ C{deliverBody}.
+
+ @ivar _bodyBuffer: A C{list} of the strings passed to C{bodyDataReceived}
+ before C{deliverBody} is called. L{None} afterwards.
+
+ @ivar _state: Indicates what state this L{Response} instance is in,
+ particularly with respect to delivering bytes from the response body
+ to an application-supplied protocol object. This may be one of
+ C{'INITIAL'}, C{'CONNECTED'}, C{'DEFERRED_CLOSE'}, or C{'FINISHED'},
+ with the following meanings:
+
+ - INITIAL: This is the state L{Response} objects start in. No
+ protocol has yet been provided and the underlying transport may
+ still have bytes to deliver to it.
+
+ - DEFERRED_CLOSE: If the underlying transport indicates all bytes
+ have been delivered but no application-provided protocol is yet
+ available, the L{Response} moves to this state. Data is
+ buffered and waiting for a protocol to be delivered to.
+
+ - CONNECTED: If a protocol is provided when the state is INITIAL,
+ the L{Response} moves to this state. Any buffered data is
+ delivered and any data which arrives from the transport
+ subsequently is given directly to the protocol.
+
+ - FINISHED: If a protocol is provided in the DEFERRED_CLOSE state,
+ the L{Response} moves to this state after delivering all
+ buffered data to the protocol. Otherwise, if the L{Response} is
+ in the CONNECTED state, if the transport indicates there is no
+ more data, the L{Response} moves to this state. Nothing else
+ can happen once the L{Response} is in this state.
+ @type _state: C{str}
+ """
+
+ length = UNKNOWN_LENGTH
+
+ _bodyProtocol = None
+ _bodyFinished = False
+
+ def __init__(self, version, code, phrase, headers, _transport):
+ """
+ @param version: HTTP version components protocol, major, minor. E.g.
+ C{(b'HTTP', 1, 1)} to mean C{b'HTTP/1.1'}.
+
+ @param code: HTTP status code.
+ @type code: L{int}
+
+ @param phrase: HTTP reason phrase, intended to give a short description
+ of the HTTP status code.
+
+ @param headers: HTTP response headers.
+ @type headers: L{twisted.web.http_headers.Headers}
+
+ @param _transport: The transport which is delivering this response.
+ """
+ self.version = version
+ self.code = code
+ self.phrase = phrase
+ self.headers = headers
+ self._transport = _transport
+ self._bodyBuffer = []
+ self._state = "INITIAL"
+ self.request = None
+ self.previousResponse = None
+
+ @classmethod
+ def _construct(cls, version, code, phrase, headers, _transport, request):
+ """
+ Private constructor.
+
+ @param version: See L{__init__}.
+ @param code: See L{__init__}.
+ @param phrase: See L{__init__}.
+ @param headers: See L{__init__}.
+ @param _transport: See L{__init__}.
+ @param request: See L{IResponse.request}.
+
+ @return: L{Response} instance.
+ """
+ response = Response(version, code, phrase, headers, _transport)
+ response.request = _ClientRequestProxy(request)
+ return response
+
+ def setPreviousResponse(self, previousResponse):
+ self.previousResponse = previousResponse
+
+ def deliverBody(self, protocol):
+ """
+ Dispatch the given L{IProtocol} depending of the current state of the
+ response.
+ """
+
+ deliverBody = makeStatefulDispatcher("deliverBody", deliverBody)
+
+ def _deliverBody_INITIAL(self, protocol):
+ """
+ Deliver any buffered data to C{protocol} and prepare to deliver any
+ future data to it. Move to the C{'CONNECTED'} state.
+ """
+ protocol.makeConnection(self._transport)
+ self._bodyProtocol = protocol
+ for data in self._bodyBuffer:
+ self._bodyProtocol.dataReceived(data)
+ self._bodyBuffer = None
+
+ self._state = "CONNECTED"
+
+ # Now that there's a protocol to consume the body, resume the
+ # transport. It was previously paused by HTTPClientParser to avoid
+ # reading too much data before it could be handled. We need to do this
+ # after we transition our state as it may recursively lead to more data
+ # being delivered, or even the body completing.
+ self._transport.resumeProducing()
+
+ def _deliverBody_CONNECTED(self, protocol):
+ """
+ It is invalid to attempt to deliver data to a protocol when it is
+ already being delivered to another protocol.
+ """
+ raise RuntimeError(
+ "Response already has protocol %r, cannot deliverBody "
+ "again" % (self._bodyProtocol,)
+ )
+
+ def _deliverBody_DEFERRED_CLOSE(self, protocol):
+ """
+ Deliver any buffered data to C{protocol} and then disconnect the
+ protocol. Move to the C{'FINISHED'} state.
+ """
+ # Unlike _deliverBody_INITIAL, there is no need to resume the
+ # transport here because all of the response data has been received
+ # already. Some higher level code may want to resume the transport if
+ # that code expects further data to be received over it.
+
+ protocol.makeConnection(self._transport)
+
+ for data in self._bodyBuffer:
+ protocol.dataReceived(data)
+ self._bodyBuffer = None
+ protocol.connectionLost(self._reason)
+ self._state = "FINISHED"
+
+ def _deliverBody_FINISHED(self, protocol):
+ """
+ It is invalid to attempt to deliver data to a protocol after the
+ response body has been delivered to another protocol.
+ """
+ raise RuntimeError("Response already finished, cannot deliverBody now.")
+
+ def _bodyDataReceived(self, data):
+ """
+ Called by HTTPClientParser with chunks of data from the response body.
+ They will be buffered or delivered to the protocol passed to
+ deliverBody.
+ """
+
+ _bodyDataReceived = makeStatefulDispatcher("bodyDataReceived", _bodyDataReceived)
+
+ def _bodyDataReceived_INITIAL(self, data):
+ """
+ Buffer any data received for later delivery to a protocol passed to
+ C{deliverBody}.
+
+ Little or no data should be buffered by this method, since the
+ transport has been paused and will not be resumed until a protocol
+ is supplied.
+ """
+ self._bodyBuffer.append(data)
+
+ def _bodyDataReceived_CONNECTED(self, data):
+ """
+ Deliver any data received to the protocol to which this L{Response}
+ is connected.
+ """
+ self._bodyProtocol.dataReceived(data)
+
+ def _bodyDataReceived_DEFERRED_CLOSE(self, data):
+ """
+ It is invalid for data to be delivered after it has been indicated
+ that the response body has been completely delivered.
+ """
+ raise RuntimeError("Cannot receive body data after _bodyDataFinished")
+
+ def _bodyDataReceived_FINISHED(self, data):
+ """
+ It is invalid for data to be delivered after the response body has
+ been delivered to a protocol.
+ """
+ raise RuntimeError("Cannot receive body data after " "protocol disconnected")
+
+ def _bodyDataFinished(self, reason=None):
+ """
+ Called by HTTPClientParser when no more body data is available. If the
+ optional reason is supplied, this indicates a problem or potential
+ problem receiving all of the response body.
+ """
+
+ _bodyDataFinished = makeStatefulDispatcher("bodyDataFinished", _bodyDataFinished)
+
+ def _bodyDataFinished_INITIAL(self, reason=None):
+ """
+ Move to the C{'DEFERRED_CLOSE'} state to wait for a protocol to
+ which to deliver the response body.
+ """
+ self._state = "DEFERRED_CLOSE"
+ if reason is None:
+ reason = Failure(ResponseDone("Response body fully received"))
+ self._reason = reason
+
+ def _bodyDataFinished_CONNECTED(self, reason=None):
+ """
+ Disconnect the protocol and move to the C{'FINISHED'} state.
+ """
+ if reason is None:
+ reason = Failure(ResponseDone("Response body fully received"))
+ self._bodyProtocol.connectionLost(reason)
+ self._bodyProtocol = None
+ self._state = "FINISHED"
+
+ def _bodyDataFinished_DEFERRED_CLOSE(self):
+ """
+ It is invalid to attempt to notify the L{Response} of the end of the
+ response body data more than once.
+ """
+ raise RuntimeError("Cannot finish body data more than once")
+
+ def _bodyDataFinished_FINISHED(self):
+ """
+ It is invalid to attempt to notify the L{Response} of the end of the
+ response body data more than once.
+ """
+ raise RuntimeError("Cannot finish body data after " "protocol disconnected")
+
+
+@implementer(IConsumer)
+class ChunkedEncoder:
+ """
+ Helper object which exposes L{IConsumer} on top of L{HTTP11ClientProtocol}
+ for streaming request bodies to the server.
+ """
+
+ def __init__(self, transport):
+ self.transport = transport
+
+ def _allowNoMoreWrites(self):
+ """
+ Indicate that no additional writes are allowed. Attempts to write
+ after calling this method will be met with an exception.
+ """
+ self.transport = None
+
+ def registerProducer(self, producer, streaming):
+ """
+ Register the given producer with C{self.transport}.
+ """
+ self.transport.registerProducer(producer, streaming)
+
+ def write(self, data):
+ """
+ Write the given request body bytes to the transport using chunked
+ encoding.
+
+ @type data: C{bytes}
+ """
+ if self.transport is None:
+ raise ExcessWrite()
+ self.transport.writeSequence(
+ (networkString("%x\r\n" % len(data)), data, b"\r\n")
+ )
+
+ def unregisterProducer(self):
+ """
+ Indicate that the request body is complete and finish the request.
+ """
+ self.write(b"")
+ self.transport.unregisterProducer()
+ self._allowNoMoreWrites()
+
+
+@implementer(IPushProducer)
+class TransportProxyProducer:
+ """
+ An L{twisted.internet.interfaces.IPushProducer} implementation which
+ wraps another such thing and proxies calls to it until it is told to stop.
+
+ @ivar _producer: The wrapped L{twisted.internet.interfaces.IPushProducer}
+ provider or L{None} after this proxy has been stopped.
+ """
+
+ # LineReceiver uses this undocumented attribute of transports to decide
+ # when to stop calling lineReceived or rawDataReceived (if it finds it to
+ # be true, it doesn't bother to deliver any more data). Set disconnecting
+ # to False here and never change it to true so that all data is always
+ # delivered to us and so that LineReceiver doesn't fail with an
+ # AttributeError.
+ disconnecting = False
+
+ def __init__(self, producer):
+ self._producer = producer
+
+ def stopProxying(self):
+ """
+ Stop forwarding calls of L{twisted.internet.interfaces.IPushProducer}
+ methods to the underlying L{twisted.internet.interfaces.IPushProducer}
+ provider.
+ """
+ self._producer = None
+
+ def stopProducing(self):
+ """
+ Proxy the stoppage to the underlying producer, unless this proxy has
+ been stopped.
+ """
+ if self._producer is not None:
+ self._producer.stopProducing()
+
+ def resumeProducing(self):
+ """
+ Proxy the resumption to the underlying producer, unless this proxy has
+ been stopped.
+ """
+ if self._producer is not None:
+ self._producer.resumeProducing()
+
+ def pauseProducing(self):
+ """
+ Proxy the pause to the underlying producer, unless this proxy has been
+ stopped.
+ """
+ if self._producer is not None:
+ self._producer.pauseProducing()
+
+ def loseConnection(self):
+ """
+ Proxy the request to lose the connection to the underlying producer,
+ unless this proxy has been stopped.
+ """
+ if self._producer is not None:
+ self._producer.loseConnection()
+
+
+class HTTP11ClientProtocol(Protocol):
+ """
+ L{HTTP11ClientProtocol} is an implementation of the HTTP 1.1 client
+ protocol. It supports as few features as possible.
+
+ @ivar _parser: After a request is issued, the L{HTTPClientParser} to
+ which received data making up the response to that request is
+ delivered.
+
+ @ivar _finishedRequest: After a request is issued, the L{Deferred} which
+ will fire when a L{Response} object corresponding to that request is
+ available. This allows L{HTTP11ClientProtocol} to fail the request
+ if there is a connection or parsing problem.
+
+ @ivar _currentRequest: After a request is issued, the L{Request}
+ instance used to make that request. This allows
+ L{HTTP11ClientProtocol} to stop request generation if necessary (for
+ example, if the connection is lost).
+
+ @ivar _transportProxy: After a request is issued, the
+ L{TransportProxyProducer} to which C{_parser} is connected. This
+ allows C{_parser} to pause and resume the transport in a way which
+ L{HTTP11ClientProtocol} can exert some control over.
+
+ @ivar _responseDeferred: After a request is issued, the L{Deferred} from
+ C{_parser} which will fire with a L{Response} when one has been
+ received. This is eventually chained with C{_finishedRequest}, but
+ only in certain cases to avoid double firing that Deferred.
+
+ @ivar _state: Indicates what state this L{HTTP11ClientProtocol} instance
+ is in with respect to transmission of a request and reception of a
+ response. This may be one of the following strings:
+
+ - QUIESCENT: This is the state L{HTTP11ClientProtocol} instances
+ start in. Nothing is happening: no request is being sent and no
+ response is being received or expected.
+
+ - TRANSMITTING: When a request is made (via L{request}), the
+ instance moves to this state. L{Request.writeTo} has been used
+ to start to send a request but it has not yet finished.
+
+ - TRANSMITTING_AFTER_RECEIVING_RESPONSE: The server has returned a
+ complete response but the request has not yet been fully sent
+ yet. The instance will remain in this state until the request
+ is fully sent.
+
+ - GENERATION_FAILED: There was an error while the request. The
+ request was not fully sent to the network.
+
+ - WAITING: The request was fully sent to the network. The
+ instance is now waiting for the response to be fully received.
+
+ - ABORTING: Application code has requested that the HTTP connection
+ be aborted.
+
+ - CONNECTION_LOST: The connection has been lost.
+ @type _state: C{str}
+
+ @ivar _abortDeferreds: A list of C{Deferred} instances that will fire when
+ the connection is lost.
+ """
+
+ _state = "QUIESCENT"
+ _parser = None
+ _finishedRequest = None
+ _currentRequest = None
+ _transportProxy = None
+ _responseDeferred = None
+ _log = Logger()
+
+ def __init__(self, quiescentCallback=lambda c: None):
+ self._quiescentCallback = quiescentCallback
+ self._abortDeferreds = []
+
+ @property
+ def state(self):
+ return self._state
+
+ def request(self, request):
+ """
+ Issue C{request} over C{self.transport} and return a L{Deferred} which
+ will fire with a L{Response} instance or an error.
+
+ @param request: The object defining the parameters of the request to
+ issue.
+ @type request: L{Request}
+
+ @rtype: L{Deferred}
+ @return: The deferred may errback with L{RequestGenerationFailed} if
+ the request was not fully written to the transport due to a local
+ error. It may errback with L{RequestTransmissionFailed} if it was
+ not fully written to the transport due to a network error. It may
+ errback with L{ResponseFailed} if the request was sent (not
+ necessarily received) but some or all of the response was lost. It
+ may errback with L{RequestNotSent} if it is not possible to send
+ any more requests using this L{HTTP11ClientProtocol}.
+ """
+ if self._state != "QUIESCENT":
+ return fail(RequestNotSent())
+
+ self._state = "TRANSMITTING"
+ _requestDeferred = maybeDeferred(request.writeTo, self.transport)
+
+ def cancelRequest(ign):
+ # Explicitly cancel the request's deferred if it's still trying to
+ # write when this request is cancelled.
+ if self._state in ("TRANSMITTING", "TRANSMITTING_AFTER_RECEIVING_RESPONSE"):
+ _requestDeferred.cancel()
+ else:
+ self.transport.abortConnection()
+ self._disconnectParser(Failure(CancelledError()))
+
+ self._finishedRequest = Deferred(cancelRequest)
+
+ # Keep track of the Request object in case we need to call stopWriting
+ # on it.
+ self._currentRequest = request
+
+ self._transportProxy = TransportProxyProducer(self.transport)
+ self._parser = HTTPClientParser(request, self._finishResponse)
+ self._parser.makeConnection(self._transportProxy)
+ self._responseDeferred = self._parser._responseDeferred
+
+ def cbRequestWritten(ignored):
+ if self._state == "TRANSMITTING":
+ self._state = "WAITING"
+ self._responseDeferred.chainDeferred(self._finishedRequest)
+
+ def ebRequestWriting(err):
+ if self._state == "TRANSMITTING":
+ self._state = "GENERATION_FAILED"
+ self.transport.abortConnection()
+ self._finishedRequest.errback(Failure(RequestGenerationFailed([err])))
+ else:
+ self._log.failure(
+ "Error writing request, but not in valid state "
+ "to finalize request: {state}",
+ failure=err,
+ state=self._state,
+ )
+
+ _requestDeferred.addCallbacks(cbRequestWritten, ebRequestWriting)
+
+ return self._finishedRequest
+
+ def _finishResponse(self, rest):
+ """
+ Called by an L{HTTPClientParser} to indicate that it has parsed a
+ complete response.
+
+ @param rest: A C{bytes} giving any trailing bytes which were given to
+ the L{HTTPClientParser} which were not part of the response it
+ was parsing.
+ """
+
+ _finishResponse = makeStatefulDispatcher("finishResponse", _finishResponse)
+
+ def _finishResponse_WAITING(self, rest):
+ # Currently the rest parameter is ignored. Don't forget to use it if
+ # we ever add support for pipelining. And maybe check what trailers
+ # mean.
+ if self._state == "WAITING":
+ self._state = "QUIESCENT"
+ else:
+ # The server sent the entire response before we could send the
+ # whole request. That sucks. Oh well. Fire the request()
+ # Deferred with the response. But first, make sure that if the
+ # request does ever finish being written that it won't try to fire
+ # that Deferred.
+ self._state = "TRANSMITTING_AFTER_RECEIVING_RESPONSE"
+ self._responseDeferred.chainDeferred(self._finishedRequest)
+
+ # This will happen if we're being called due to connection being lost;
+ # if so, no need to disconnect parser again, or to call
+ # _quiescentCallback.
+ if self._parser is None:
+ return
+
+ reason = ConnectionDone("synthetic!")
+ connHeaders = self._parser.connHeaders.getRawHeaders(b"connection", ())
+ if (
+ (b"close" in connHeaders)
+ or self._state != "QUIESCENT"
+ or not self._currentRequest.persistent
+ ):
+ self._giveUp(Failure(reason))
+ else:
+ # Just in case we had paused the transport, resume it before
+ # considering it quiescent again.
+ self.transport.resumeProducing()
+
+ # We call the quiescent callback first, to ensure connection gets
+ # added back to connection pool before we finish the request.
+ try:
+ self._quiescentCallback(self)
+ except BaseException:
+ # If callback throws exception, just log it and disconnect;
+ # keeping persistent connections around is an optimisation:
+ self._log.failure("")
+ self.transport.loseConnection()
+ self._disconnectParser(reason)
+
+ _finishResponse_TRANSMITTING = _finishResponse_WAITING
+
+ def _disconnectParser(self, reason):
+ """
+ If there is still a parser, call its C{connectionLost} method with the
+ given reason. If there is not, do nothing.
+
+ @type reason: L{Failure}
+ """
+ if self._parser is not None:
+ parser = self._parser
+ self._parser = None
+ self._currentRequest = None
+ self._finishedRequest = None
+ self._responseDeferred = None
+
+ # The parser is no longer allowed to do anything to the real
+ # transport. Stop proxying from the parser's transport to the real
+ # transport before telling the parser it's done so that it can't do
+ # anything.
+ self._transportProxy.stopProxying()
+ self._transportProxy = None
+ parser.connectionLost(reason)
+
+ def _giveUp(self, reason):
+ """
+ Lose the underlying connection and disconnect the parser with the given
+ L{Failure}.
+
+ Use this method instead of calling the transport's loseConnection
+ method directly otherwise random things will break.
+ """
+ self.transport.loseConnection()
+ self._disconnectParser(reason)
+
+ def dataReceived(self, bytes):
+ """
+ Handle some stuff from some place.
+ """
+ try:
+ self._parser.dataReceived(bytes)
+ except BaseException:
+ self._giveUp(Failure())
+
+ def connectionLost(self, reason):
+ """
+ The underlying transport went away. If appropriate, notify the parser
+ object.
+ """
+
+ connectionLost = makeStatefulDispatcher("connectionLost", connectionLost)
+
+ def _connectionLost_QUIESCENT(self, reason):
+ """
+ Nothing is currently happening. Move to the C{'CONNECTION_LOST'}
+ state but otherwise do nothing.
+ """
+ self._state = "CONNECTION_LOST"
+
+ def _connectionLost_GENERATION_FAILED(self, reason):
+ """
+ The connection was in an inconsistent state. Move to the
+ C{'CONNECTION_LOST'} state but otherwise do nothing.
+ """
+ self._state = "CONNECTION_LOST"
+
+ def _connectionLost_TRANSMITTING(self, reason):
+ """
+ Fail the L{Deferred} for the current request, notify the request
+ object that it does not need to continue transmitting itself, and
+ move to the C{'CONNECTION_LOST'} state.
+ """
+ self._state = "CONNECTION_LOST"
+ self._finishedRequest.errback(Failure(RequestTransmissionFailed([reason])))
+ del self._finishedRequest
+
+ # Tell the request that it should stop bothering now.
+ self._currentRequest.stopWriting()
+
+ def _connectionLost_TRANSMITTING_AFTER_RECEIVING_RESPONSE(self, reason):
+ """
+ Move to the C{'CONNECTION_LOST'} state.
+ """
+ self._state = "CONNECTION_LOST"
+
+ def _connectionLost_WAITING(self, reason):
+ """
+ Disconnect the response parser so that it can propagate the event as
+ necessary (for example, to call an application protocol's
+ C{connectionLost} method, or to fail a request L{Deferred}) and move
+ to the C{'CONNECTION_LOST'} state.
+ """
+ self._disconnectParser(reason)
+ self._state = "CONNECTION_LOST"
+
+ def _connectionLost_ABORTING(self, reason):
+ """
+ Disconnect the response parser with a L{ConnectionAborted} failure, and
+ move to the C{'CONNECTION_LOST'} state.
+ """
+ self._disconnectParser(Failure(ConnectionAborted()))
+ self._state = "CONNECTION_LOST"
+ for d in self._abortDeferreds:
+ d.callback(None)
+ self._abortDeferreds = []
+
+ def abort(self):
+ """
+ Close the connection and cause all outstanding L{request} L{Deferred}s
+ to fire with an error.
+ """
+ if self._state == "CONNECTION_LOST":
+ return succeed(None)
+ self.transport.loseConnection()
+ self._state = "ABORTING"
+ d = Deferred()
+ self._abortDeferreds.append(d)
+ return d