A utility belt for advanced users of python-requests
—
Thread pool implementation for executing multiple HTTP requests concurrently with session reuse, customizable initialization, and comprehensive error handling.
Execute multiple requests concurrently using a simple interface.
def map(requests, **kwargs):
"""
Execute multiple requests in parallel using thread pool.
Parameters:
- requests: list of dict, each containing request parameters
- num_processes: int, number of worker threads (default: CPU count)
- initializer: callable, function to initialize each session
- initargs: tuple, arguments for initializer function
Returns:
tuple: (responses, exceptions) - generators for successful and failed requests
"""from requests_toolbelt import threaded
# Simple parallel GET requests
urls_to_get = [
{'url': 'https://api.github.com/users/octocat', 'method': 'GET'},
{'url': 'https://api.github.com/users/defunkt', 'method': 'GET'},
{'url': 'https://api.github.com/repos/requests/requests', 'method': 'GET'},
]
responses, exceptions = threaded.map(urls_to_get)
for response in responses:
print(f"Status: {response.status_code}, URL: {response.url}")
for exception in exceptions:
print(f"Error: {exception}")
# Mixed request methods with parameters
requests_to_make = [
{
'url': 'https://httpbin.org/get',
'method': 'GET',
'params': {'key': 'value'}
},
{
'url': 'https://httpbin.org/post',
'method': 'POST',
'json': {'data': 'test'}
},
{
'url': 'https://httpbin.org/put',
'method': 'PUT',
'data': 'raw data'
}
]
responses, exceptions = threaded.map(requests_to_make, num_processes=5)
# Custom number of threads
responses, exceptions = threaded.map(urls_to_get, num_processes=10)Customize session configuration for all threads.
from requests_toolbelt import threaded, user_agent
def setup_session(session):
"""Initialize session with custom settings."""
session.headers['User-Agent'] = user_agent('my-scraper', '1.0')
session.headers['Accept'] = 'application/json'
session.timeout = 30
urls = [
{'url': 'https://api.example.com/data/1', 'method': 'GET'},
{'url': 'https://api.example.com/data/2', 'method': 'GET'},
{'url': 'https://api.example.com/data/3', 'method': 'GET'},
]
responses, exceptions = threaded.map(
urls,
initializer=setup_session,
num_processes=3
)
# Session with authentication
def setup_authenticated_session(session, api_key):
"""Setup session with API key authentication."""
session.headers['Authorization'] = f'Bearer {api_key}'
session.headers['Content-Type'] = 'application/json'
responses, exceptions = threaded.map(
requests_to_make,
initializer=setup_authenticated_session,
initargs=('your-api-key-here',),
num_processes=5
)Direct access to the thread pool for more control over execution.
class Pool:
"""
Thread pool for parallel HTTP requests.
Parameters:
- num_processes: int, number of worker threads (default: CPU count)
- initializer: callable, function to initialize each session
- initargs: tuple, arguments for initializer function
- job_queue: Queue, custom job queue (optional)
"""
def __init__(self, num_processes=None, initializer=None, initargs=None, job_queue=None): ...
def responses(self):
"""
Generator yielding successful responses.
Yields:
ThreadResponse: wrapped response objects
"""
def exceptions(self):
"""
Generator yielding exceptions from failed requests.
Yields:
ThreadException: wrapped exception objects
"""
def join_all(self):
"""Wait for all threads to complete."""
class ThreadResponse:
"""Wrapper for successful HTTP responses."""
def __init__(self, response): ...
class ThreadException:
"""Wrapper for failed HTTP requests."""
def __init__(self, exception, request): ...from requests_toolbelt.threaded.pool import Pool
import queue
# Create job queue
job_queue = queue.Queue()
requests_to_make = [
{'url': 'https://api.example.com/slow-endpoint', 'method': 'GET'},
{'url': 'https://api.example.com/fast-endpoint', 'method': 'GET'},
{'url': 'https://api.example.com/medium-endpoint', 'method': 'GET'},
]
for request in requests_to_make:
job_queue.put(request)
# Create and use pool directly
pool = Pool(num_processes=2, job_queue=job_queue)
# Process responses as they complete
for response in pool.responses():
print(f"Completed: {response.url} - Status: {response.status_code}")
print(f"Response time: {response.elapsed.total_seconds()}s")
# Handle any exceptions
for exception in pool.exceptions():
print(f"Failed request: {exception.request}")
print(f"Exception: {exception.exception}")
pool.join_all()Individual thread workers for custom threading scenarios.
class SessionThread:
"""
Individual thread worker for HTTP requests.
Parameters:
- job_queue: Queue, queue containing request jobs
- response_queue: Queue, queue for successful responses
- exception_queue: Queue, queue for exceptions
- initializer: callable, session initialization function
- initargs: tuple, arguments for initializer
"""
def __init__(self, job_queue, response_queue, exception_queue,
initializer=None, initargs=None): ...from requests_toolbelt import threaded
import time
def monitor_parallel_requests():
"""Example of monitoring parallel requests with error handling."""
urls = [
{'url': 'https://httpbin.org/delay/1', 'method': 'GET'},
{'url': 'https://httpbin.org/status/404', 'method': 'GET'}, # Will fail
{'url': 'https://httpbin.org/delay/2', 'method': 'GET'},
{'url': 'https://invalid-url-example.com', 'method': 'GET'}, # Will fail
]
start_time = time.time()
responses, exceptions = threaded.map(urls, num_processes=4)
successful_count = 0
error_count = 0
print("Successful responses:")
for response in responses:
successful_count += 1
print(f" {response.url}: {response.status_code}")
print("\\nErrors:")
for exception in exceptions:
error_count += 1
print(f" {exception.request.get('url', 'Unknown')}: {exception.exception}")
total_time = time.time() - start_time
print(f"\\nCompleted in {total_time:.2f}s")
print(f"Success: {successful_count}, Errors: {error_count}")
# Usage
monitor_parallel_requests()from requests_toolbelt import threaded
import time
def batch_with_rate_limit(urls, batch_size=10, delay_between_batches=1):
"""Process URLs in batches with rate limiting."""
def setup_session(session):
session.timeout = 30
session.headers['User-Agent'] = 'Batch Processor 1.0'
results = []
for i in range(0, len(urls), batch_size):
batch = urls[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1} ({len(batch)} requests)")
responses, exceptions = threaded.map(
batch,
initializer=setup_session,
num_processes=min(5, len(batch))
)
batch_results = {
'responses': list(responses),
'exceptions': list(exceptions)
}
results.append(batch_results)
if i + batch_size < len(urls): # Don't delay after last batch
time.sleep(delay_between_batches)
return results
# Usage
urls = [{'url': f'https://httpbin.org/delay/{i%3}', 'method': 'GET'} for i in range(50)]
results = batch_with_rate_limit(urls, batch_size=10, delay_between_batches=2)Install with Tessl CLI
npx tessl i tessl/pypi-requests-toolbelt