1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
|
import inspect
import warnings
from json import dumps as json_dumps
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Dict,
Iterable,
Iterator,
Mapping,
Optional,
Tuple,
Union,
)
from urllib.parse import urlencode
from ._exceptions import StreamClosed, StreamConsumed
from ._multipart import MultipartStream
from ._types import (
AsyncByteStream,
RequestContent,
RequestData,
RequestFiles,
ResponseContent,
SyncByteStream,
)
from ._utils import peek_filelike_length, primitive_value_to_str
class ByteStream(AsyncByteStream, SyncByteStream):
def __init__(self, stream: bytes) -> None:
self._stream = stream
def __iter__(self) -> Iterator[bytes]:
yield self._stream
async def __aiter__(self) -> AsyncIterator[bytes]:
yield self._stream
class IteratorByteStream(SyncByteStream):
CHUNK_SIZE = 65_536
def __init__(self, stream: Iterable[bytes]) -> None:
self._stream = stream
self._is_stream_consumed = False
self._is_generator = inspect.isgenerator(stream)
def __iter__(self) -> Iterator[bytes]:
if self._is_stream_consumed and self._is_generator:
raise StreamConsumed()
self._is_stream_consumed = True
if hasattr(self._stream, "read"):
# File-like interfaces should use 'read' directly.
chunk = self._stream.read(self.CHUNK_SIZE)
while chunk:
yield chunk
chunk = self._stream.read(self.CHUNK_SIZE)
else:
# Otherwise iterate.
for part in self._stream:
yield part
class AsyncIteratorByteStream(AsyncByteStream):
CHUNK_SIZE = 65_536
def __init__(self, stream: AsyncIterable[bytes]) -> None:
self._stream = stream
self._is_stream_consumed = False
self._is_generator = inspect.isasyncgen(stream)
async def __aiter__(self) -> AsyncIterator[bytes]:
if self._is_stream_consumed and self._is_generator:
raise StreamConsumed()
self._is_stream_consumed = True
if hasattr(self._stream, "aread"):
# File-like interfaces should use 'aread' directly.
chunk = await self._stream.aread(self.CHUNK_SIZE)
while chunk:
yield chunk
chunk = await self._stream.aread(self.CHUNK_SIZE)
else:
# Otherwise iterate.
async for part in self._stream:
yield part
class UnattachedStream(AsyncByteStream, SyncByteStream):
"""
If a request or response is serialized using pickle, then it is no longer
attached to a stream for I/O purposes. Any stream operations should result
in `httpx.StreamClosed`.
"""
def __iter__(self) -> Iterator[bytes]:
raise StreamClosed()
async def __aiter__(self) -> AsyncIterator[bytes]:
raise StreamClosed()
yield b"" # pragma: no cover
def encode_content(
content: Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]],
) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
if isinstance(content, (bytes, str)):
body = content.encode("utf-8") if isinstance(content, str) else content
content_length = len(body)
headers = {"Content-Length": str(content_length)} if body else {}
return headers, ByteStream(body)
elif isinstance(content, Iterable) and not isinstance(content, dict):
# `not isinstance(content, dict)` is a bit oddly specific, but it
# catches a case that's easy for users to make in error, and would
# otherwise pass through here, like any other bytes-iterable,
# because `dict` happens to be iterable. See issue #2491.
content_length_or_none = peek_filelike_length(content)
if content_length_or_none is None:
headers = {"Transfer-Encoding": "chunked"}
else:
headers = {"Content-Length": str(content_length_or_none)}
return headers, IteratorByteStream(content) # type: ignore
elif isinstance(content, AsyncIterable):
headers = {"Transfer-Encoding": "chunked"}
return headers, AsyncIteratorByteStream(content)
raise TypeError(f"Unexpected type for 'content', {type(content)!r}")
def encode_urlencoded_data(
data: RequestData,
) -> Tuple[Dict[str, str], ByteStream]:
plain_data = []
for key, value in data.items():
if isinstance(value, (list, tuple)):
plain_data.extend([(key, primitive_value_to_str(item)) for item in value])
else:
plain_data.append((key, primitive_value_to_str(value)))
body = urlencode(plain_data, doseq=True).encode("utf-8")
content_length = str(len(body))
content_type = "application/x-www-form-urlencoded"
headers = {"Content-Length": content_length, "Content-Type": content_type}
return headers, ByteStream(body)
def encode_multipart_data(
data: RequestData, files: RequestFiles, boundary: Optional[bytes]
) -> Tuple[Dict[str, str], MultipartStream]:
multipart = MultipartStream(data=data, files=files, boundary=boundary)
headers = multipart.get_headers()
return headers, multipart
def encode_text(text: str) -> Tuple[Dict[str, str], ByteStream]:
body = text.encode("utf-8")
content_length = str(len(body))
content_type = "text/plain; charset=utf-8"
headers = {"Content-Length": content_length, "Content-Type": content_type}
return headers, ByteStream(body)
def encode_html(html: str) -> Tuple[Dict[str, str], ByteStream]:
body = html.encode("utf-8")
content_length = str(len(body))
content_type = "text/html; charset=utf-8"
headers = {"Content-Length": content_length, "Content-Type": content_type}
return headers, ByteStream(body)
def encode_json(json: Any) -> Tuple[Dict[str, str], ByteStream]:
body = json_dumps(json).encode("utf-8")
content_length = str(len(body))
content_type = "application/json"
headers = {"Content-Length": content_length, "Content-Type": content_type}
return headers, ByteStream(body)
def encode_request(
content: Optional[RequestContent] = None,
data: Optional[RequestData] = None,
files: Optional[RequestFiles] = None,
json: Optional[Any] = None,
boundary: Optional[bytes] = None,
) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
"""
Handles encoding the given `content`, `data`, `files`, and `json`,
returning a two-tuple of (<headers>, <stream>).
"""
if data is not None and not isinstance(data, Mapping):
# We prefer to separate `content=<bytes|str|byte iterator|bytes aiterator>`
# for raw request content, and `data=<form data>` for url encoded or
# multipart form content.
#
# However for compat with requests, we *do* still support
# `data=<bytes...>` usages. We deal with that case here, treating it
# as if `content=<...>` had been supplied instead.
message = "Use 'content=<...>' to upload raw bytes/text content."
warnings.warn(message, DeprecationWarning)
return encode_content(data)
if content is not None:
return encode_content(content)
elif files:
return encode_multipart_data(data or {}, files, boundary)
elif data:
return encode_urlencoded_data(data)
elif json is not None:
return encode_json(json)
return {}, ByteStream(b"")
def encode_response(
content: Optional[ResponseContent] = None,
text: Optional[str] = None,
html: Optional[str] = None,
json: Optional[Any] = None,
) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
"""
Handles encoding the given `content`, returning a two-tuple of
(<headers>, <stream>).
"""
if content is not None:
return encode_content(content)
elif text is not None:
return encode_text(text)
elif html is not None:
return encode_html(html)
elif json is not None:
return encode_json(json)
return {}, ByteStream(b"")
|