0
# Streaming API
1
2
Stream request and response content to handle large files efficiently without loading everything into memory.
3
4
## Overview
5
6
httpx provides comprehensive streaming capabilities for both requests and responses. This is essential for handling large files, real-time data, or when memory usage is a concern.
7
8
## Capabilities
9
10
### Response Streaming
11
12
```python { .api }
13
def stream(
14
method: str,
15
url: URL | str,
16
*,
17
params: QueryParamTypes | None = None,
18
content: RequestContent | None = None,
19
data: RequestData | None = None,
20
files: RequestFiles | None = None,
21
json: Any | None = None,
22
headers: HeaderTypes | None = None,
23
cookies: CookieTypes | None = None,
24
auth: AuthTypes | None = None,
25
proxy: ProxyTypes | None = None,
26
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
27
follow_redirects: bool = False,
28
verify: ssl.SSLContext | str | bool = True,
29
trust_env: bool = True,
30
) -> Iterator[Response]:
31
"""
32
Stream a request response without loading content into memory.
33
34
Args:
35
method (str): HTTP method
36
url (URL | str): URL for the request
37
params (QueryParamTypes, optional): Query parameters
38
content (RequestContent, optional): Raw bytes content
39
data (RequestData, optional): Form data
40
files (RequestFiles, optional): Files to upload
41
json (Any, optional): JSON-serializable object
42
headers (HeaderTypes, optional): HTTP headers
43
cookies (CookieTypes, optional): Cookies
44
auth (AuthTypes, optional): Authentication
45
proxy (ProxyTypes, optional): Proxy URL
46
timeout (TimeoutTypes): Timeout configuration
47
follow_redirects (bool): Whether to follow redirects
48
verify (ssl.SSLContext | str | bool): SSL verification
49
trust_env (bool): Use environment variables
50
51
Returns:
52
Iterator[Response]: Context manager yielding streaming response
53
54
Usage:
55
with httpx.stream('GET', url) as response:
56
for chunk in response.iter_bytes():
57
process_chunk(chunk)
58
"""
59
```
60
61
### Response Streaming Methods
62
63
```python { .api }
64
class Response:
65
def iter_bytes(self, chunk_size: int = 1024) -> Iterator[bytes]:
66
"""
67
Iterate over response content as bytes chunks.
68
69
Args:
70
chunk_size (int): Size of each chunk in bytes (default: 1024)
71
72
Yields:
73
bytes: Chunks of response content
74
75
Note:
76
Content is decoded (decompressed) if Content-Encoding is present.
77
"""
78
79
async def aiter_bytes(self, chunk_size: int = 1024) -> AsyncIterator[bytes]:
80
"""
81
Iterate over response content as bytes chunks (async).
82
83
Args:
84
chunk_size (int): Size of each chunk in bytes (default: 1024)
85
86
Yields:
87
bytes: Chunks of response content
88
"""
89
90
def iter_text(self, chunk_size: int = 1024) -> Iterator[str]:
91
"""
92
Iterate over response content as text chunks.
93
94
Args:
95
chunk_size (int): Size of each chunk in bytes before decoding (default: 1024)
96
97
Yields:
98
str: Text chunks of response content
99
100
Note:
101
Content is decoded using the response's character encoding.
102
"""
103
104
async def aiter_text(self, chunk_size: int = 1024) -> AsyncIterator[str]:
105
"""
106
Iterate over response content as text chunks (async).
107
108
Args:
109
chunk_size (int): Size of each chunk in bytes before decoding (default: 1024)
110
111
Yields:
112
str: Text chunks of response content
113
"""
114
115
def iter_lines(self) -> Iterator[str]:
116
"""
117
Iterate over response content as text lines.
118
119
Yields:
120
str: Lines of response content (without line endings)
121
122
Note:
123
Handles different line ending styles (\\n, \\r\\n, \\r).
124
"""
125
126
async def aiter_lines(self) -> AsyncIterator[str]:
127
"""
128
Iterate over response content as text lines (async).
129
130
Yields:
131
str: Lines of response content (without line endings)
132
"""
133
134
def iter_raw(self, chunk_size=1024):
135
"""
136
Iterate over raw response content without any decoding.
137
138
Args:
139
chunk_size (int): Size of each chunk in bytes (default: 1024)
140
141
Yields:
142
bytes: Raw chunks exactly as received from server
143
144
Note:
145
No decompression or decoding is performed.
146
"""
147
148
async def aiter_raw(self, chunk_size=1024):
149
"""
150
Iterate over raw response content without any decoding (async).
151
152
Args:
153
chunk_size (int): Size of each chunk in bytes (default: 1024)
154
155
Yields:
156
bytes: Raw chunks exactly as received from server
157
"""
158
```
159
160
### Client Streaming Methods
161
162
```python { .api }
163
class Client:
164
def stream(self, method, url, **kwargs):
165
"""
166
Stream a request response using the client.
167
168
Args:
169
method (str): HTTP method
170
url (str): URL for the request
171
**kwargs: Same arguments as client.request()
172
173
Returns:
174
Generator[Response]: Context manager yielding streaming response
175
"""
176
177
class AsyncClient:
178
def stream(self, method, url, **kwargs):
179
"""
180
Stream a request response using the async client.
181
182
Args:
183
method (str): HTTP method
184
url (str): URL for the request
185
**kwargs: Same arguments as client.request()
186
187
Returns:
188
AsyncGenerator[Response]: Async context manager yielding streaming response
189
"""
190
```
191
192
## Usage Examples
193
194
### Basic Response Streaming
195
196
```python
197
import httpx
198
199
# Stream large file download
200
with httpx.stream('GET', 'https://example.com/large-file.zip') as response:
201
with open('large-file.zip', 'wb') as f:
202
for chunk in response.iter_bytes():
203
f.write(chunk)
204
```
205
206
### Streaming with Client
207
208
```python
209
import httpx
210
211
with httpx.Client() as client:
212
with client.stream('GET', 'https://example.com/large-file.zip') as response:
213
print(f"Content-Length: {response.headers.get('content-length')}")
214
215
total_size = 0
216
with open('large-file.zip', 'wb') as f:
217
for chunk in response.iter_bytes(chunk_size=8192):
218
f.write(chunk)
219
total_size += len(chunk)
220
print(f"Downloaded: {total_size} bytes")
221
```
222
223
### Text Streaming
224
225
```python
226
import httpx
227
228
# Stream text content line by line
229
with httpx.stream('GET', 'https://example.com/large-text-file.txt') as response:
230
for line in response.iter_lines():
231
process_line(line)
232
233
def process_line(line):
234
# Process each line without loading entire file
235
print(f"Line: {line}")
236
```
237
238
### JSON Streaming
239
240
```python
241
import httpx
242
import json
243
244
# Stream and parse JSON objects line by line (JSONL format)
245
with httpx.stream('GET', 'https://api.example.com/data.jsonl') as response:
246
for line in response.iter_lines():
247
if line.strip(): # Skip empty lines
248
data = json.loads(line)
249
process_json_object(data)
250
251
def process_json_object(obj):
252
print(f"Processing: {obj}")
253
```
254
255
### Async Response Streaming
256
257
```python
258
import httpx
259
import asyncio
260
261
async def download_file():
262
async with httpx.AsyncClient() as client:
263
async with client.stream('GET', 'https://example.com/large-file.zip') as response:
264
with open('large-file.zip', 'wb') as f:
265
async for chunk in response.aiter_bytes():
266
f.write(chunk)
267
268
asyncio.run(download_file())
269
```
270
271
### Streaming with Progress
272
273
```python
274
import httpx
275
276
def download_with_progress(url, filename):
277
with httpx.stream('GET', url) as response:
278
total_size = int(response.headers.get('content-length', 0))
279
downloaded = 0
280
281
with open(filename, 'wb') as f:
282
for chunk in response.iter_bytes(chunk_size=8192):
283
f.write(chunk)
284
downloaded += len(chunk)
285
286
if total_size > 0:
287
percent = (downloaded / total_size) * 100
288
print(f"Progress: {percent:.1f}% ({downloaded}/{total_size} bytes)")
289
290
download_with_progress('https://example.com/file.zip', 'file.zip')
291
```
292
293
### Raw Content Streaming
294
295
```python
296
import httpx
297
298
# Stream raw content without decompression
299
with httpx.stream('GET', 'https://example.com/compressed-data.gz') as response:
300
with open('compressed-data.gz', 'wb') as f:
301
# iter_raw() preserves original compression
302
for chunk in response.iter_raw():
303
f.write(chunk)
304
```
305
306
### Conditional Streaming
307
308
```python
309
import httpx
310
311
def smart_download(url, filename):
312
with httpx.stream('GET', url) as response:
313
content_length = response.headers.get('content-length')
314
315
if content_length and int(content_length) > 10_000_000: # 10MB
316
# Stream large files
317
print("Large file detected, streaming...")
318
with open(filename, 'wb') as f:
319
for chunk in response.iter_bytes():
320
f.write(chunk)
321
else:
322
# Load small files normally
323
print("Small file, loading into memory...")
324
content = response.read()
325
with open(filename, 'wb') as f:
326
f.write(content)
327
328
smart_download('https://example.com/unknown-size-file', 'file.dat')
329
```
330
331
### Streaming API Responses
332
333
```python
334
import httpx
335
import json
336
337
def stream_api_data(api_url):
338
"""Stream paginated API data."""
339
page = 1
340
341
with httpx.Client() as client:
342
while True:
343
with client.stream('GET', f"{api_url}?page={page}") as response:
344
# Check if we have data
345
if response.status_code != 200:
346
break
347
348
# Parse JSON response
349
data = json.loads(response.read())
350
351
if not data.get('items'):
352
break
353
354
# Process each item
355
for item in data['items']:
356
yield item
357
358
page += 1
359
360
# Usage
361
for item in stream_api_data('https://api.example.com/data'):
362
process_item(item)
363
```
364
365
### Server-Sent Events (SSE) Simulation
366
367
```python
368
import httpx
369
370
def stream_events(url):
371
"""Stream server-sent events."""
372
with httpx.stream('GET', url, headers={'Accept': 'text/event-stream'}) as response:
373
for line in response.iter_lines():
374
if line.startswith('data: '):
375
event_data = line[6:] # Remove 'data: ' prefix
376
yield event_data
377
378
# Usage
379
for event in stream_events('https://api.example.com/events'):
380
print(f"Event: {event}")
381
```
382
383
### Error Handling in Streaming
384
385
```python
386
import httpx
387
388
def safe_stream_download(url, filename):
389
try:
390
with httpx.stream('GET', url) as response:
391
response.raise_for_status() # Check status before streaming
392
393
with open(filename, 'wb') as f:
394
for chunk in response.iter_bytes():
395
f.write(chunk)
396
397
print(f"Successfully downloaded {filename}")
398
399
except httpx.HTTPStatusError as exc:
400
print(f"HTTP error {exc.response.status_code}: {exc.response.text}")
401
except httpx.RequestError as exc:
402
print(f"Request error: {exc}")
403
except IOError as exc:
404
print(f"File error: {exc}")
405
406
safe_stream_download('https://example.com/file.zip', 'file.zip')
407
```
408
409
### Memory-Efficient JSON Processing
410
411
```python
412
import httpx
413
import json
414
415
def process_large_json_array(url):
416
"""Process large JSON array without loading entire response."""
417
with httpx.stream('GET', url) as response:
418
# Assume response is JSON array with objects on separate lines
419
buffer = ""
420
421
for chunk in response.iter_text():
422
buffer += chunk
423
424
# Process complete JSON objects
425
while '\n' in buffer:
426
line, buffer = buffer.split('\n', 1)
427
if line.strip():
428
try:
429
obj = json.loads(line)
430
yield obj
431
except json.JSONDecodeError:
432
continue
433
434
# Process remaining buffer
435
if buffer.strip():
436
try:
437
obj = json.loads(buffer)
438
yield obj
439
except json.JSONDecodeError:
440
pass
441
442
# Usage
443
for item in process_large_json_array('https://api.example.com/large-dataset'):
444
process_item(item)
445
```
446
447
### Resumable Downloads
448
449
```python
450
import httpx
451
import os
452
453
def resumable_download(url, filename):
454
"""Download file with resume capability."""
455
# Check if partial file exists
456
resume_pos = 0
457
if os.path.exists(filename):
458
resume_pos = os.path.getsize(filename)
459
print(f"Resuming download from byte {resume_pos}")
460
461
headers = {}
462
if resume_pos > 0:
463
headers['Range'] = f'bytes={resume_pos}-'
464
465
mode = 'ab' if resume_pos > 0 else 'wb'
466
467
with httpx.stream('GET', url, headers=headers) as response:
468
if response.status_code in (200, 206): # OK or Partial Content
469
with open(filename, mode) as f:
470
for chunk in response.iter_bytes():
471
f.write(chunk)
472
print(f"Download completed: {filename}")
473
else:
474
print(f"Download failed: {response.status_code}")
475
476
resumable_download('https://example.com/large-file.zip', 'large-file.zip')
477
```