0
# Sessions and Streaming
1
2
Enhanced session classes with base URL support and streaming iterators for efficient handling of large uploads with known sizes, providing better control over HTTP request lifecycle and data streaming.
3
4
## Capabilities
5
6
### Base URL Sessions
7
8
Session class that automatically prepends a base URL to all requests, simplifying API client development.
9
10
```python { .api }
11
class BaseUrlSession:
12
"""
13
Session with automatic base URL handling for all requests.
14
15
Parameters:
16
- base_url: str, base URL for all requests (optional)
17
"""
18
def __init__(self, base_url=None): ...
19
20
def request(self, method, url, *args, **kwargs):
21
"""
22
Send request with base URL automatically prepended.
23
24
Parameters:
25
- method: str, HTTP method
26
- url: str, relative or absolute URL
27
- *args, **kwargs: standard requests arguments
28
29
Returns:
30
Response: HTTP response object
31
"""
32
33
def prepare_request(self, request, *args, **kwargs):
34
"""
35
Prepare request with base URL handling.
36
37
Parameters:
38
- request: Request object to prepare
39
- *args, **kwargs: preparation arguments
40
41
Returns:
42
PreparedRequest: prepared request with full URL
43
"""
44
45
def create_url(self, url):
46
"""
47
Create full URL from base URL and relative path.
48
49
Parameters:
50
- url: str, relative or absolute URL
51
52
Returns:
53
str: complete URL
54
"""
55
```
56
57
#### Usage Examples
58
59
```python
60
import requests
61
from requests_toolbelt.sessions import BaseUrlSession
62
63
# API client with base URL
64
api_session = BaseUrlSession(base_url='https://api.example.com/v1/')
65
66
# All requests automatically use base URL
67
users = api_session.get('users/') # GET https://api.example.com/v1/users/
68
user = api_session.get('users/123') # GET https://api.example.com/v1/users/123
69
api_session.post('users/', json={'name': 'John'}) # POST https://api.example.com/v1/users/
70
71
# Absolute URLs still work
72
external = api_session.get('https://external-api.com/data')
73
74
# Trailing slash behavior
75
session1 = BaseUrlSession('https://api.example.com/v1/')
76
session2 = BaseUrlSession('https://api.example.com/v1')
77
78
# Both work the same way
79
response1 = session1.get('users')
80
response2 = session2.get('users')
81
82
# Leading slash changes behavior (absolute path)
83
api_session = BaseUrlSession('https://api.example.com/v1/')
84
relative = api_session.get('users') # https://api.example.com/v1/users
85
absolute = api_session.get('/users') # https://api.example.com/users (note: no /v1/)
86
87
# Custom URL creation
88
class CustomBaseUrlSession(BaseUrlSession):
89
def create_url(self, url):
90
"""Custom URL creation with version parameter."""
91
if not url.startswith(('http://', 'https://')):
92
# Add version parameter to all relative URLs
93
separator = '&' if '?' in url else '?'
94
url = f"{url}{separator}version=2"
95
return super().create_url(url)
96
97
custom_session = CustomBaseUrlSession('https://api.example.com/')
98
response = custom_session.get('data') # GET https://api.example.com/data?version=2
99
```
100
101
### Streaming Iterator
102
103
Iterator interface for streaming large uploads with known sizes without using chunked transfer encoding.
104
105
```python { .api }
106
class StreamingIterator:
107
"""
108
Iterator for streaming data with known size to avoid chunked encoding.
109
110
Parameters:
111
- size: int, total size of data in bytes (must be positive)
112
- iterator: iterator yielding data chunks or file-like object with read method
113
- encoding: str, character encoding (default: 'utf-8')
114
"""
115
def __init__(self, size, iterator, encoding='utf-8'): ...
116
117
# Instance attributes
118
size: int # Expected size of the upload
119
len: int # Attribute that requests checks for body length
120
encoding: str # Encoding the input data is using
121
iterator: any # The iterator used to generate upload data
122
123
def read(self, n=-1):
124
"""
125
Read up to n bytes from the iterator.
126
127
Parameters:
128
- n: int, number of bytes to read (-1 for available)
129
130
Returns:
131
bytes: data chunk
132
"""
133
134
def __len__(self):
135
"""
136
Return total size of data.
137
138
Returns:
139
int: total size in bytes
140
"""
141
142
def __iter__(self):
143
"""Return iterator interface."""
144
```
145
146
#### Usage Examples
147
148
```python
149
import requests
150
from requests_toolbelt import StreamingIterator
151
152
# Stream large file with known size
153
def file_chunks(filename, chunk_size=8192):
154
"""Generator that yields file chunks."""
155
with open(filename, 'rb') as f:
156
while True:
157
chunk = f.read(chunk_size)
158
if not chunk:
159
break
160
yield chunk
161
162
# Get file size
163
import os
164
file_size = os.path.getsize('large_file.dat')
165
166
# Create streaming iterator
167
stream = StreamingIterator(file_size, file_chunks('large_file.dat'))
168
169
# Upload without chunked encoding
170
response = requests.post(
171
'https://upload.example.com/files',
172
data=stream,
173
headers={'Content-Length': str(len(stream))}
174
)
175
176
# Stream generated data
177
def generate_data():
178
"""Generator that creates data on the fly."""
179
for i in range(1000):
180
yield f"Line {i}: {'x' * 100}\\n".encode('utf-8')
181
182
# Calculate total size
183
data_size = sum(len(f"Line {i}: {'x' * 100}\\n".encode('utf-8')) for i in range(1000))
184
185
stream = StreamingIterator(data_size, generate_data())
186
187
response = requests.put(
188
'https://api.example.com/data',
189
data=stream,
190
headers={
191
'Content-Type': 'text/plain',
192
'Content-Length': str(len(stream))
193
}
194
)
195
196
# Stream with progress monitoring
197
def monitored_file_stream(filename, progress_callback=None):
198
"""Stream file with progress updates."""
199
file_size = os.path.getsize(filename)
200
bytes_sent = 0
201
202
def chunks_with_progress():
203
nonlocal bytes_sent
204
with open(filename, 'rb') as f:
205
while True:
206
chunk = f.read(8192)
207
if not chunk:
208
break
209
bytes_sent += len(chunk)
210
if progress_callback:
211
progress_callback(bytes_sent, file_size)
212
yield chunk
213
214
return StreamingIterator(file_size, chunks_with_progress())
215
216
def upload_progress(sent, total):
217
percent = (sent / total) * 100
218
print(f"\\rUpload progress: {percent:.1f}% ({sent}/{total} bytes)", end='')
219
220
# Upload with progress
221
stream = monitored_file_stream('large_video.mp4', upload_progress)
222
response = requests.post(
223
'https://upload.service.com/video',
224
data=stream,
225
headers={'Content-Length': str(len(stream))}
226
)
227
print("\\nUpload complete!")
228
```
229
230
### Advanced Session Patterns
231
232
```python
233
from requests_toolbelt.sessions import BaseUrlSession
234
from requests_toolbelt.utils import user_agent
235
from requests_toolbelt import GuessAuth
236
237
class APIClient(BaseUrlSession):
238
"""Enhanced API client with authentication and error handling."""
239
240
def __init__(self, base_url, api_key=None, username=None, password=None):
241
super().__init__(base_url)
242
243
# Set user agent
244
self.headers['User-Agent'] = user_agent('api-client', '1.0')
245
246
# Configure authentication
247
if api_key:
248
self.headers['Authorization'] = f'Bearer {api_key}'
249
elif username and password:
250
self.auth = GuessAuth(username, password)
251
252
def request(self, method, url, **kwargs):
253
"""Override to add error handling."""
254
response = super().request(method, url, **kwargs)
255
256
# Handle common errors
257
if response.status_code == 401:
258
raise Exception("Authentication failed")
259
elif response.status_code == 429:
260
raise Exception("Rate limit exceeded")
261
elif response.status_code >= 500:
262
raise Exception(f"Server error: {response.status_code}")
263
264
return response
265
266
# Usage
267
client = APIClient(
268
'https://api.myservice.com/v2/',
269
api_key='your-api-key-here'
270
)
271
272
try:
273
users = client.get('users').json()
274
user = client.post('users', json={'name': 'John', 'email': 'john@example.com'}).json()
275
print(f"Created user: {user}")
276
except Exception as e:
277
print(f"API error: {e}")
278
279
# Multi-environment client
280
class MultiEnvAPIClient:
281
"""API client supporting multiple environments."""
282
283
def __init__(self, environment='production'):
284
base_urls = {
285
'development': 'https://dev-api.example.com/',
286
'staging': 'https://staging-api.example.com/',
287
'production': 'https://api.example.com/'
288
}
289
290
self.session = BaseUrlSession(base_urls[environment])
291
self.session.headers['User-Agent'] = user_agent('multi-env-client', '1.0')
292
293
def get_user(self, user_id):
294
return self.session.get(f'users/{user_id}').json()
295
296
def create_user(self, user_data):
297
return self.session.post('users', json=user_data).json()
298
299
# Usage
300
dev_client = MultiEnvAPIClient('development')
301
prod_client = MultiEnvAPIClient('production')
302
303
# Same interface, different endpoints
304
dev_user = dev_client.get_user(123)
305
prod_user = prod_client.get_user(123)
306
```
307
308
### Memory-Efficient Streaming
309
310
```python
311
import requests
312
from requests_toolbelt import StreamingIterator
313
import hashlib
314
315
def stream_with_checksum(data_iterator, total_size):
316
"""Stream data while calculating checksum."""
317
hasher = hashlib.sha256()
318
319
def chunks_with_hash():
320
for chunk in data_iterator:
321
hasher.update(chunk)
322
yield chunk
323
324
stream = StreamingIterator(total_size, chunks_with_hash())
325
326
# Upload data
327
response = requests.post(
328
'https://upload.example.com/verify',
329
data=stream,
330
headers={
331
'Content-Length': str(total_size),
332
'Content-Type': 'application/octet-stream'
333
}
334
)
335
336
# Return response and checksum
337
return response, hasher.hexdigest()
338
339
# Usage with large file
340
def large_file_iterator(filename):
341
with open(filename, 'rb') as f:
342
while True:
343
chunk = f.read(64 * 1024) # 64KB chunks
344
if not chunk:
345
break
346
yield chunk
347
348
file_size = os.path.getsize('large_database_backup.sql')
349
response, checksum = stream_with_checksum(
350
large_file_iterator('large_database_backup.sql'),
351
file_size
352
)
353
354
print(f"Upload completed: {response.status_code}")
355
print(f"SHA256 checksum: {checksum}")
356
```