0
# Parallel Requests
1
2
Thread pool implementation for executing multiple HTTP requests concurrently with session reuse, customizable initialization, and comprehensive error handling.
3
4
## Capabilities
5
6
### Simple Parallel Execution
7
8
Execute multiple requests concurrently using a simple interface.
9
10
```python { .api }
11
def map(requests, **kwargs):
12
"""
13
Execute multiple requests in parallel using thread pool.
14
15
Parameters:
16
- requests: list of dict, each containing request parameters
17
- num_processes: int, number of worker threads (default: CPU count)
18
- initializer: callable, function to initialize each session
19
- initargs: tuple, arguments for initializer function
20
21
Returns:
22
tuple: (responses, exceptions) - generators for successful and failed requests
23
"""
24
```
25
26
#### Usage Examples
27
28
```python
29
from requests_toolbelt import threaded
30
31
# Simple parallel GET requests
32
urls_to_get = [
33
{'url': 'https://api.github.com/users/octocat', 'method': 'GET'},
34
{'url': 'https://api.github.com/users/defunkt', 'method': 'GET'},
35
{'url': 'https://api.github.com/repos/requests/requests', 'method': 'GET'},
36
]
37
38
responses, exceptions = threaded.map(urls_to_get)
39
40
for response in responses:
41
print(f"Status: {response.status_code}, URL: {response.url}")
42
43
for exception in exceptions:
44
print(f"Error: {exception}")
45
46
# Mixed request methods with parameters
47
requests_to_make = [
48
{
49
'url': 'https://httpbin.org/get',
50
'method': 'GET',
51
'params': {'key': 'value'}
52
},
53
{
54
'url': 'https://httpbin.org/post',
55
'method': 'POST',
56
'json': {'data': 'test'}
57
},
58
{
59
'url': 'https://httpbin.org/put',
60
'method': 'PUT',
61
'data': 'raw data'
62
}
63
]
64
65
responses, exceptions = threaded.map(requests_to_make, num_processes=5)
66
67
# Custom number of threads
68
responses, exceptions = threaded.map(urls_to_get, num_processes=10)
69
```
70
71
### Session Initialization
72
73
Customize session configuration for all threads.
74
75
```python
76
from requests_toolbelt import threaded, user_agent
77
78
def setup_session(session):
79
"""Initialize session with custom settings."""
80
session.headers['User-Agent'] = user_agent('my-scraper', '1.0')
81
session.headers['Accept'] = 'application/json'
82
session.timeout = 30
83
84
urls = [
85
{'url': 'https://api.example.com/data/1', 'method': 'GET'},
86
{'url': 'https://api.example.com/data/2', 'method': 'GET'},
87
{'url': 'https://api.example.com/data/3', 'method': 'GET'},
88
]
89
90
responses, exceptions = threaded.map(
91
urls,
92
initializer=setup_session,
93
num_processes=3
94
)
95
96
# Session with authentication
97
def setup_authenticated_session(session, api_key):
98
"""Setup session with API key authentication."""
99
session.headers['Authorization'] = f'Bearer {api_key}'
100
session.headers['Content-Type'] = 'application/json'
101
102
responses, exceptions = threaded.map(
103
requests_to_make,
104
initializer=setup_authenticated_session,
105
initargs=('your-api-key-here',),
106
num_processes=5
107
)
108
```
109
110
### Advanced Thread Pool
111
112
Direct access to the thread pool for more control over execution.
113
114
```python { .api }
115
class Pool:
116
"""
117
Thread pool for parallel HTTP requests.
118
119
Parameters:
120
- num_processes: int, number of worker threads (default: CPU count)
121
- initializer: callable, function to initialize each session
122
- initargs: tuple, arguments for initializer function
123
- job_queue: Queue, custom job queue (optional)
124
"""
125
def __init__(self, num_processes=None, initializer=None, initargs=None, job_queue=None): ...
126
127
def responses(self):
128
"""
129
Generator yielding successful responses.
130
131
Yields:
132
ThreadResponse: wrapped response objects
133
"""
134
135
def exceptions(self):
136
"""
137
Generator yielding exceptions from failed requests.
138
139
Yields:
140
ThreadException: wrapped exception objects
141
"""
142
143
def join_all(self):
144
"""Wait for all threads to complete."""
145
146
class ThreadResponse:
147
"""Wrapper for successful HTTP responses."""
148
def __init__(self, response): ...
149
150
class ThreadException:
151
"""Wrapper for failed HTTP requests."""
152
def __init__(self, exception, request): ...
153
```
154
155
#### Usage Examples
156
157
```python
158
from requests_toolbelt.threaded.pool import Pool
159
import queue
160
161
# Create job queue
162
job_queue = queue.Queue()
163
requests_to_make = [
164
{'url': 'https://api.example.com/slow-endpoint', 'method': 'GET'},
165
{'url': 'https://api.example.com/fast-endpoint', 'method': 'GET'},
166
{'url': 'https://api.example.com/medium-endpoint', 'method': 'GET'},
167
]
168
169
for request in requests_to_make:
170
job_queue.put(request)
171
172
# Create and use pool directly
173
pool = Pool(num_processes=2, job_queue=job_queue)
174
175
# Process responses as they complete
176
for response in pool.responses():
177
print(f"Completed: {response.url} - Status: {response.status_code}")
178
print(f"Response time: {response.elapsed.total_seconds()}s")
179
180
# Handle any exceptions
181
for exception in pool.exceptions():
182
print(f"Failed request: {exception.request}")
183
print(f"Exception: {exception.exception}")
184
185
pool.join_all()
186
```
187
188
### Session Thread Management
189
190
Individual thread workers for custom threading scenarios.
191
192
```python { .api }
193
class SessionThread:
194
"""
195
Individual thread worker for HTTP requests.
196
197
Parameters:
198
- job_queue: Queue, queue containing request jobs
199
- response_queue: Queue, queue for successful responses
200
- exception_queue: Queue, queue for exceptions
201
- initializer: callable, session initialization function
202
- initargs: tuple, arguments for initializer
203
"""
204
def __init__(self, job_queue, response_queue, exception_queue,
205
initializer=None, initargs=None): ...
206
```
207
208
### Error Handling and Monitoring
209
210
```python
211
from requests_toolbelt import threaded
212
import time
213
214
def monitor_parallel_requests():
215
"""Example of monitoring parallel requests with error handling."""
216
217
urls = [
218
{'url': 'https://httpbin.org/delay/1', 'method': 'GET'},
219
{'url': 'https://httpbin.org/status/404', 'method': 'GET'}, # Will fail
220
{'url': 'https://httpbin.org/delay/2', 'method': 'GET'},
221
{'url': 'https://invalid-url-example.com', 'method': 'GET'}, # Will fail
222
]
223
224
start_time = time.time()
225
responses, exceptions = threaded.map(urls, num_processes=4)
226
227
successful_count = 0
228
error_count = 0
229
230
print("Successful responses:")
231
for response in responses:
232
successful_count += 1
233
print(f" {response.url}: {response.status_code}")
234
235
print("\\nErrors:")
236
for exception in exceptions:
237
error_count += 1
238
print(f" {exception.request.get('url', 'Unknown')}: {exception.exception}")
239
240
total_time = time.time() - start_time
241
print(f"\\nCompleted in {total_time:.2f}s")
242
print(f"Success: {successful_count}, Errors: {error_count}")
243
244
# Usage
245
monitor_parallel_requests()
246
```
247
248
### Batch Processing with Rate Limiting
249
250
```python
251
from requests_toolbelt import threaded
252
import time
253
254
def batch_with_rate_limit(urls, batch_size=10, delay_between_batches=1):
255
"""Process URLs in batches with rate limiting."""
256
257
def setup_session(session):
258
session.timeout = 30
259
session.headers['User-Agent'] = 'Batch Processor 1.0'
260
261
results = []
262
263
for i in range(0, len(urls), batch_size):
264
batch = urls[i:i + batch_size]
265
print(f"Processing batch {i//batch_size + 1} ({len(batch)} requests)")
266
267
responses, exceptions = threaded.map(
268
batch,
269
initializer=setup_session,
270
num_processes=min(5, len(batch))
271
)
272
273
batch_results = {
274
'responses': list(responses),
275
'exceptions': list(exceptions)
276
}
277
results.append(batch_results)
278
279
if i + batch_size < len(urls): # Don't delay after last batch
280
time.sleep(delay_between_batches)
281
282
return results
283
284
# Usage
285
urls = [{'url': f'https://httpbin.org/delay/{i%3}', 'method': 'GET'} for i in range(50)]
286
results = batch_with_rate_limit(urls, batch_size=10, delay_between_batches=2)
287
```