CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-requests-toolbelt

A utility belt for advanced users of python-requests

Pending
Overview
Eval results
Files

threading.mddocs/

Parallel Requests

Thread pool implementation for executing multiple HTTP requests concurrently with session reuse, customizable initialization, and comprehensive error handling.

Capabilities

Simple Parallel Execution

Execute multiple requests concurrently using a simple interface.

def map(requests, **kwargs):
    """
    Execute multiple requests in parallel using thread pool.
    
    Parameters:
    - requests: list of dict, each containing request parameters
    - num_processes: int, number of worker threads (default: CPU count)
    - initializer: callable, function to initialize each session
    - initargs: tuple, arguments for initializer function
    
    Returns:
    tuple: (responses, exceptions) - generators for successful and failed requests
    """

Usage Examples

from requests_toolbelt import threaded

# Simple parallel GET requests
urls_to_get = [
    {'url': 'https://api.github.com/users/octocat', 'method': 'GET'},
    {'url': 'https://api.github.com/users/defunkt', 'method': 'GET'},
    {'url': 'https://api.github.com/repos/requests/requests', 'method': 'GET'},
]

responses, exceptions = threaded.map(urls_to_get)

for response in responses:
    print(f"Status: {response.status_code}, URL: {response.url}")

for exception in exceptions:
    print(f"Error: {exception}")

# Mixed request methods with parameters
requests_to_make = [
    {
        'url': 'https://httpbin.org/get',
        'method': 'GET',
        'params': {'key': 'value'}
    },
    {
        'url': 'https://httpbin.org/post',
        'method': 'POST',
        'json': {'data': 'test'}
    },
    {
        'url': 'https://httpbin.org/put',
        'method': 'PUT',
        'data': 'raw data'
    }
]

responses, exceptions = threaded.map(requests_to_make, num_processes=5)

# Custom number of threads
responses, exceptions = threaded.map(urls_to_get, num_processes=10)

Session Initialization

Customize session configuration for all threads.

from requests_toolbelt import threaded, user_agent

def setup_session(session):
    """Initialize session with custom settings."""
    session.headers['User-Agent'] = user_agent('my-scraper', '1.0')
    session.headers['Accept'] = 'application/json'
    session.timeout = 30

urls = [
    {'url': 'https://api.example.com/data/1', 'method': 'GET'},
    {'url': 'https://api.example.com/data/2', 'method': 'GET'},
    {'url': 'https://api.example.com/data/3', 'method': 'GET'},
]

responses, exceptions = threaded.map(
    urls,
    initializer=setup_session,
    num_processes=3
)

# Session with authentication
def setup_authenticated_session(session, api_key):
    """Setup session with API key authentication."""
    session.headers['Authorization'] = f'Bearer {api_key}'
    session.headers['Content-Type'] = 'application/json'

responses, exceptions = threaded.map(
    requests_to_make,
    initializer=setup_authenticated_session,
    initargs=('your-api-key-here',),
    num_processes=5
)

Advanced Thread Pool

Direct access to the thread pool for more control over execution.

class Pool:
    """
    Thread pool for parallel HTTP requests.
    
    Parameters:
    - num_processes: int, number of worker threads (default: CPU count)
    - initializer: callable, function to initialize each session
    - initargs: tuple, arguments for initializer function
    - job_queue: Queue, custom job queue (optional)
    """
    def __init__(self, num_processes=None, initializer=None, initargs=None, job_queue=None): ...
    
    def responses(self):
        """
        Generator yielding successful responses.
        
        Yields:
        ThreadResponse: wrapped response objects
        """
    
    def exceptions(self):
        """
        Generator yielding exceptions from failed requests.
        
        Yields:
        ThreadException: wrapped exception objects
        """
    
    def join_all(self):
        """Wait for all threads to complete."""

class ThreadResponse:
    """Wrapper for successful HTTP responses."""
    def __init__(self, response): ...

class ThreadException:
    """Wrapper for failed HTTP requests."""
    def __init__(self, exception, request): ...

Usage Examples

from requests_toolbelt.threaded.pool import Pool
import queue

# Create job queue
job_queue = queue.Queue()
requests_to_make = [
    {'url': 'https://api.example.com/slow-endpoint', 'method': 'GET'},
    {'url': 'https://api.example.com/fast-endpoint', 'method': 'GET'},
    {'url': 'https://api.example.com/medium-endpoint', 'method': 'GET'},
]

for request in requests_to_make:
    job_queue.put(request)

# Create and use pool directly
pool = Pool(num_processes=2, job_queue=job_queue)

# Process responses as they complete
for response in pool.responses():
    print(f"Completed: {response.url} - Status: {response.status_code}")
    print(f"Response time: {response.elapsed.total_seconds()}s")

# Handle any exceptions
for exception in pool.exceptions():
    print(f"Failed request: {exception.request}")
    print(f"Exception: {exception.exception}")

pool.join_all()

Session Thread Management

Individual thread workers for custom threading scenarios.

class SessionThread:
    """
    Individual thread worker for HTTP requests.
    
    Parameters:
    - job_queue: Queue, queue containing request jobs
    - response_queue: Queue, queue for successful responses
    - exception_queue: Queue, queue for exceptions
    - initializer: callable, session initialization function
    - initargs: tuple, arguments for initializer
    """
    def __init__(self, job_queue, response_queue, exception_queue, 
                 initializer=None, initargs=None): ...

Error Handling and Monitoring

from requests_toolbelt import threaded
import time

def monitor_parallel_requests():
    """Example of monitoring parallel requests with error handling."""
    
    urls = [
        {'url': 'https://httpbin.org/delay/1', 'method': 'GET'},
        {'url': 'https://httpbin.org/status/404', 'method': 'GET'},  # Will fail
        {'url': 'https://httpbin.org/delay/2', 'method': 'GET'},
        {'url': 'https://invalid-url-example.com', 'method': 'GET'},  # Will fail
    ]
    
    start_time = time.time()
    responses, exceptions = threaded.map(urls, num_processes=4)
    
    successful_count = 0
    error_count = 0
    
    print("Successful responses:")
    for response in responses:
        successful_count += 1
        print(f"  {response.url}: {response.status_code}")
    
    print("\\nErrors:")
    for exception in exceptions:
        error_count += 1
        print(f"  {exception.request.get('url', 'Unknown')}: {exception.exception}")
    
    total_time = time.time() - start_time
    print(f"\\nCompleted in {total_time:.2f}s")
    print(f"Success: {successful_count}, Errors: {error_count}")

# Usage
monitor_parallel_requests()

Batch Processing with Rate Limiting

from requests_toolbelt import threaded
import time

def batch_with_rate_limit(urls, batch_size=10, delay_between_batches=1):
    """Process URLs in batches with rate limiting."""
    
    def setup_session(session):
        session.timeout = 30
        session.headers['User-Agent'] = 'Batch Processor 1.0'
    
    results = []
    
    for i in range(0, len(urls), batch_size):
        batch = urls[i:i + batch_size]
        print(f"Processing batch {i//batch_size + 1} ({len(batch)} requests)")
        
        responses, exceptions = threaded.map(
            batch,
            initializer=setup_session,
            num_processes=min(5, len(batch))
        )
        
        batch_results = {
            'responses': list(responses),
            'exceptions': list(exceptions)
        }
        results.append(batch_results)
        
        if i + batch_size < len(urls):  # Don't delay after last batch
            time.sleep(delay_between_batches)
    
    return results

# Usage
urls = [{'url': f'https://httpbin.org/delay/{i%3}', 'method': 'GET'} for i in range(50)]
results = batch_with_rate_limit(urls, batch_size=10, delay_between_batches=2)

Install with Tessl CLI

npx tessl i tessl/pypi-requests-toolbelt

docs

adapters.md

authentication.md

cookies-exceptions.md

downloads.md

index.md

multipart.md

sessions-streaming.md

threading.md

utilities.md

tile.json