CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-requests-toolbelt

A utility belt for advanced users of python-requests

Pending
Overview
Eval results
Files

sessions-streaming.mddocs/

Sessions and Streaming

Enhanced session classes with base URL support and streaming iterators for efficient handling of large uploads with known sizes, providing better control over HTTP request lifecycle and data streaming.

Capabilities

Base URL Sessions

Session class that automatically prepends a base URL to all requests, simplifying API client development.

class BaseUrlSession:
    """
    Session with automatic base URL handling for all requests.
    
    Parameters:
    - base_url: str, base URL for all requests (optional)
    """
    def __init__(self, base_url=None): ...
    
    def request(self, method, url, *args, **kwargs):
        """
        Send request with base URL automatically prepended.
        
        Parameters:
        - method: str, HTTP method
        - url: str, relative or absolute URL
        - *args, **kwargs: standard requests arguments
        
        Returns:
        Response: HTTP response object
        """
    
    def prepare_request(self, request, *args, **kwargs):
        """
        Prepare request with base URL handling.
        
        Parameters:
        - request: Request object to prepare
        - *args, **kwargs: preparation arguments
        
        Returns:
        PreparedRequest: prepared request with full URL
        """
    
    def create_url(self, url):
        """
        Create full URL from base URL and relative path.
        
        Parameters:
        - url: str, relative or absolute URL
        
        Returns:
        str: complete URL
        """

Usage Examples

import requests
from requests_toolbelt.sessions import BaseUrlSession

# API client with base URL
api_session = BaseUrlSession(base_url='https://api.example.com/v1/')

# All requests automatically use base URL
users = api_session.get('users/')  # GET https://api.example.com/v1/users/
user = api_session.get('users/123')  # GET https://api.example.com/v1/users/123
api_session.post('users/', json={'name': 'John'})  # POST https://api.example.com/v1/users/

# Absolute URLs still work
external = api_session.get('https://external-api.com/data')

# Trailing slash behavior
session1 = BaseUrlSession('https://api.example.com/v1/')
session2 = BaseUrlSession('https://api.example.com/v1')

# Both work the same way
response1 = session1.get('users')
response2 = session2.get('users')

# Leading slash changes behavior (absolute path)
api_session = BaseUrlSession('https://api.example.com/v1/')
relative = api_session.get('users')  # https://api.example.com/v1/users
absolute = api_session.get('/users')  # https://api.example.com/users (note: no /v1/)

# Custom URL creation
class CustomBaseUrlSession(BaseUrlSession):
    def create_url(self, url):
        """Custom URL creation with version parameter."""
        if not url.startswith(('http://', 'https://')):
            # Add version parameter to all relative URLs
            separator = '&' if '?' in url else '?'
            url = f"{url}{separator}version=2"
        return super().create_url(url)

custom_session = CustomBaseUrlSession('https://api.example.com/')
response = custom_session.get('data')  # GET https://api.example.com/data?version=2

Streaming Iterator

Iterator interface for streaming large uploads with known sizes without using chunked transfer encoding.

class StreamingIterator:
    """
    Iterator for streaming data with known size to avoid chunked encoding.
    
    Parameters:
    - size: int, total size of data in bytes (must be positive)
    - iterator: iterator yielding data chunks or file-like object with read method
    - encoding: str, character encoding (default: 'utf-8')
    """
    def __init__(self, size, iterator, encoding='utf-8'): ...
    
    # Instance attributes
    size: int        # Expected size of the upload
    len: int         # Attribute that requests checks for body length  
    encoding: str    # Encoding the input data is using
    iterator: any    # The iterator used to generate upload data
    
    def read(self, n=-1):
        """
        Read up to n bytes from the iterator.
        
        Parameters:
        - n: int, number of bytes to read (-1 for available)
        
        Returns:
        bytes: data chunk
        """
    
    def __len__(self):
        """
        Return total size of data.
        
        Returns:
        int: total size in bytes
        """
    
    def __iter__(self):
        """Return iterator interface."""

Usage Examples

import requests
from requests_toolbelt import StreamingIterator

# Stream large file with known size
def file_chunks(filename, chunk_size=8192):
    """Generator that yields file chunks."""
    with open(filename, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

# Get file size
import os
file_size = os.path.getsize('large_file.dat')

# Create streaming iterator
stream = StreamingIterator(file_size, file_chunks('large_file.dat'))

# Upload without chunked encoding
response = requests.post(
    'https://upload.example.com/files',
    data=stream,
    headers={'Content-Length': str(len(stream))}
)

# Stream generated data
def generate_data():
    """Generator that creates data on the fly."""
    for i in range(1000):
        yield f"Line {i}: {'x' * 100}\\n".encode('utf-8')

# Calculate total size
data_size = sum(len(f"Line {i}: {'x' * 100}\\n".encode('utf-8')) for i in range(1000))

stream = StreamingIterator(data_size, generate_data())

response = requests.put(
    'https://api.example.com/data',
    data=stream,
    headers={
        'Content-Type': 'text/plain',
        'Content-Length': str(len(stream))
    }
)

# Stream with progress monitoring
def monitored_file_stream(filename, progress_callback=None):
    """Stream file with progress updates."""
    file_size = os.path.getsize(filename)
    bytes_sent = 0
    
    def chunks_with_progress():
        nonlocal bytes_sent
        with open(filename, 'rb') as f:
            while True:
                chunk = f.read(8192)
                if not chunk:
                    break
                bytes_sent += len(chunk)
                if progress_callback:
                    progress_callback(bytes_sent, file_size)
                yield chunk
    
    return StreamingIterator(file_size, chunks_with_progress())

def upload_progress(sent, total):
    percent = (sent / total) * 100
    print(f"\\rUpload progress: {percent:.1f}% ({sent}/{total} bytes)", end='')

# Upload with progress
stream = monitored_file_stream('large_video.mp4', upload_progress)
response = requests.post(
    'https://upload.service.com/video',
    data=stream,
    headers={'Content-Length': str(len(stream))}
)
print("\\nUpload complete!")

Advanced Session Patterns

from requests_toolbelt.sessions import BaseUrlSession
from requests_toolbelt.utils import user_agent
from requests_toolbelt import GuessAuth

class APIClient(BaseUrlSession):
    """Enhanced API client with authentication and error handling."""
    
    def __init__(self, base_url, api_key=None, username=None, password=None):
        super().__init__(base_url)
        
        # Set user agent
        self.headers['User-Agent'] = user_agent('api-client', '1.0')
        
        # Configure authentication
        if api_key:
            self.headers['Authorization'] = f'Bearer {api_key}'
        elif username and password:
            self.auth = GuessAuth(username, password)
    
    def request(self, method, url, **kwargs):
        """Override to add error handling."""
        response = super().request(method, url, **kwargs)
        
        # Handle common errors
        if response.status_code == 401:
            raise Exception("Authentication failed")
        elif response.status_code == 429:
            raise Exception("Rate limit exceeded")
        elif response.status_code >= 500:
            raise Exception(f"Server error: {response.status_code}")
        
        return response

# Usage
client = APIClient(
    'https://api.myservice.com/v2/',
    api_key='your-api-key-here'
)

try:
    users = client.get('users').json()
    user = client.post('users', json={'name': 'John', 'email': 'john@example.com'}).json()
    print(f"Created user: {user}")
except Exception as e:
    print(f"API error: {e}")

# Multi-environment client
class MultiEnvAPIClient:
    """API client supporting multiple environments."""
    
    def __init__(self, environment='production'):
        base_urls = {
            'development': 'https://dev-api.example.com/',
            'staging': 'https://staging-api.example.com/',
            'production': 'https://api.example.com/'
        }
        
        self.session = BaseUrlSession(base_urls[environment])
        self.session.headers['User-Agent'] = user_agent('multi-env-client', '1.0')
    
    def get_user(self, user_id):
        return self.session.get(f'users/{user_id}').json()
    
    def create_user(self, user_data):
        return self.session.post('users', json=user_data).json()

# Usage
dev_client = MultiEnvAPIClient('development')
prod_client = MultiEnvAPIClient('production')

# Same interface, different endpoints
dev_user = dev_client.get_user(123)
prod_user = prod_client.get_user(123)

Memory-Efficient Streaming

import requests
from requests_toolbelt import StreamingIterator
import hashlib

def stream_with_checksum(data_iterator, total_size):
    """Stream data while calculating checksum."""
    hasher = hashlib.sha256()
    
    def chunks_with_hash():
        for chunk in data_iterator:
            hasher.update(chunk)
            yield chunk
    
    stream = StreamingIterator(total_size, chunks_with_hash())
    
    # Upload data
    response = requests.post(
        'https://upload.example.com/verify',
        data=stream,
        headers={
            'Content-Length': str(total_size),
            'Content-Type': 'application/octet-stream'
        }
    )
    
    # Return response and checksum
    return response, hasher.hexdigest()

# Usage with large file
def large_file_iterator(filename):
    with open(filename, 'rb') as f:
        while True:
            chunk = f.read(64 * 1024)  # 64KB chunks
            if not chunk:
                break
            yield chunk

file_size = os.path.getsize('large_database_backup.sql')
response, checksum = stream_with_checksum(
    large_file_iterator('large_database_backup.sql'),
    file_size
)

print(f"Upload completed: {response.status_code}")
print(f"SHA256 checksum: {checksum}")

Install with Tessl CLI

npx tessl i tessl/pypi-requests-toolbelt

docs

adapters.md

authentication.md

cookies-exceptions.md

downloads.md

index.md

multipart.md

sessions-streaming.md

threading.md

utilities.md

tile.json