CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiohttp

Async http client/server framework (asyncio)

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

utilities.mddocs/

Utilities

Supporting utilities and extensions including request tracing, DNS resolution, authentication helpers, compression configuration, and Gunicorn worker integration. These utilities provide integration points, performance optimization, and extensibility features for aiohttp applications.

Capabilities

Request Tracing

Comprehensive request tracing system for monitoring and debugging HTTP client operations with detailed event tracking.

class TraceConfig:
    def __init__(self):
        """Create trace configuration."""
    
    def on_request_start(self, callback):
        """
        Register request start callback.
        
        Parameters:
        - callback: Async function receiving TraceRequestStartParams
        """
    
    def on_request_end(self, callback):
        """
        Register request end callback.
        
        Parameters:
        - callback: Async function receiving TraceRequestEndParams
        """
    
    def on_request_exception(self, callback):
        """
        Register request exception callback.
        
        Parameters:
        - callback: Async function receiving TraceRequestExceptionParams
        """
    
    def on_request_redirect(self, callback):
        """
        Register request redirect callback.
        
        Parameters:
        - callback: Async function receiving TraceRequestRedirectParams
        """
    
    def on_request_headers_sent(self, callback):
        """
        Register request headers sent callback.
        
        Parameters:
        - callback: Async function receiving TraceRequestHeadersSentParams
        """
    
    def on_request_chunk_sent(self, callback):
        """
        Register request chunk sent callback.
        
        Parameters:
        - callback: Async function receiving TraceRequestChunkSentParams
        """
    
    def on_response_chunk_received(self, callback):
        """
        Register response chunk received callback.
        
        Parameters:
        - callback: Async function receiving TraceResponseChunkReceivedParams
        """
    
    def on_connection_create_start(self, callback):
        """
        Register connection creation start callback.
        
        Parameters:
        - callback: Async function receiving TraceConnectionCreateStartParams
        """
    
    def on_connection_create_end(self, callback):
        """
        Register connection creation end callback.
        
        Parameters:
        - callback: Async function receiving TraceConnectionCreateEndParams
        """
    
    def on_connection_queued_start(self, callback):
        """
        Register connection queued start callback.
        
        Parameters:
        - callback: Async function receiving TraceConnectionQueuedStartParams
        """
    
    def on_connection_queued_end(self, callback):
        """
        Register connection queued end callback.
        
        Parameters:
        - callback: Async function receiving TraceConnectionQueuedEndParams
        """
    
    def on_connection_reuseconn(self, callback):
        """
        Register connection reuse callback.
        
        Parameters:
        - callback: Async function receiving TraceConnectionReuseconnParams
        """
    
    def on_dns_resolvehost_start(self, callback):
        """
        Register DNS resolve start callback.
        
        Parameters:
        - callback: Async function receiving TraceDnsResolveHostStartParams
        """
    
    def on_dns_resolvehost_end(self, callback):
        """
        Register DNS resolve end callback.
        
        Parameters:
        - callback: Async function receiving TraceDnsResolveHostEndParams
        """
    
    def on_dns_cache_hit(self, callback):
        """
        Register DNS cache hit callback.
        
        Parameters:
        - callback: Async function receiving TraceDnsCacheHitParams
        """
    
    def on_dns_cache_miss(self, callback):
        """
        Register DNS cache miss callback.
        
        Parameters:
        - callback: Async function receiving TraceDnsCacheMissParams
        """

# Trace event parameter classes
class TraceRequestStartParams:
    """Request start trace parameters."""
    @property
    def method(self): ...
    @property
    def url(self): ...
    @property
    def headers(self): ...

class TraceRequestEndParams:
    """Request end trace parameters."""
    @property
    def method(self): ...
    @property
    def url(self): ...
    @property
    def headers(self): ...
    @property
    def response(self): ...

class TraceRequestExceptionParams:
    """Request exception trace parameters."""
    @property
    def method(self): ...
    @property
    def url(self): ...
    @property
    def headers(self): ...
    @property
    def exception(self): ...

class TraceRequestRedirectParams:
    """Request redirect trace parameters."""
    @property
    def method(self): ...
    @property
    def url(self): ...
    @property
    def headers(self): ...
    @property
    def response(self): ...

class TraceRequestHeadersSentParams:
    """Request headers sent trace parameters."""
    @property
    def method(self): ...
    @property
    def url(self): ...
    @property
    def headers(self): ...

class TraceRequestChunkSentParams:
    """Request chunk sent trace parameters."""
    @property
    def method(self): ...
    @property
    def url(self): ...
    @property
    def chunk(self): ...

class TraceResponseChunkReceivedParams:
    """Response chunk received trace parameters."""
    @property
    def method(self): ...
    @property
    def url(self): ...
    @property
    def chunk(self): ...

class TraceConnectionCreateStartParams:
    """Connection creation start trace parameters."""
    pass

class TraceConnectionCreateEndParams:
    """Connection creation end trace parameters."""
    pass

class TraceConnectionQueuedStartParams:
    """Connection queued start trace parameters."""
    pass

class TraceConnectionQueuedEndParams:
    """Connection queued end trace parameters."""
    pass

class TraceConnectionReuseconnParams:
    """Connection reuse trace parameters."""
    pass

class TraceDnsResolveHostStartParams:
    """DNS resolve start trace parameters."""
    @property
    def host(self): ...

class TraceDnsResolveHostEndParams:
    """DNS resolve end trace parameters."""
    @property
    def host(self): ...

class TraceDnsCacheHitParams:
    """DNS cache hit trace parameters."""
    @property
    def host(self): ...

class TraceDnsCacheMissParams:
    """DNS cache miss trace parameters."""  
    @property
    def host(self): ...

DNS Resolution

Configurable DNS resolution with support for different resolver implementations and caching strategies.

class AsyncResolver:
    def __init__(self, loop=None):
        """
        Asynchronous DNS resolver.
        
        Parameters:
        - loop: Event loop
        """
    
    async def resolve(self, hostname, port=0, family=0):
        """
        Resolve hostname to addresses.
        
        Parameters:
        - hostname (str): Hostname to resolve
        - port (int): Port number
        - family (int): Address family
        
        Returns:
        List of resolved addresses
        """
    
    async def close(self):
        """Close resolver resources."""

class DefaultResolver:
    def __init__(self, loop=None):
        """
        Default system DNS resolver.
        
        Parameters:
        - loop: Event loop
        """
    
    async def resolve(self, hostname, port=0, family=0):
        """Resolve hostname using system resolver."""
    
    async def close(self):
        """Close resolver resources."""

class ThreadedResolver:
    def __init__(self, loop=None):
        """
        Thread-based DNS resolver.
        
        Parameters:
        - loop: Event loop
        """
    
    async def resolve(self, hostname, port=0, family=0):
        """Resolve hostname in thread pool."""
    
    async def close(self):
        """Close resolver resources."""

Helper Classes and Functions

Utility classes and functions for common operations and data structures.

class ChainMapProxy:
    def __init__(self, maps):
        """
        Chain map proxy for nested dictionaries.
        
        Parameters:
        - maps: Sequence of dictionaries to chain
        """
    
    def __getitem__(self, key):
        """Get item from chained maps."""
    
    def __contains__(self, key):
        """Check if key exists in any map."""
    
    def get(self, key, default=None):
        """Get item with default value."""
    
    def keys(self):
        """Get all keys from chained maps."""
    
    def values(self):
        """Get all values from chained maps."""
    
    def items(self):
        """Get all items from chained maps."""

class ETag:
    def __init__(self, value, is_weak=False):
        """
        ETag header handling.
        
        Parameters:
        - value (str): ETag value
        - is_weak (bool): Whether ETag is weak
        """
    
    @property
    def value(self):
        """ETag value."""
    
    @property
    def is_weak(self):
        """Whether ETag is weak."""
    
    def __str__(self):
        """String representation of ETag."""

class Fingerprint:
    def __init__(self, fingerprint):
        """
        SSL certificate fingerprint handling.
        
        Parameters:
        - fingerprint (bytes): Certificate fingerprint
        """
    
    def check(self, transport):
        """
        Verify certificate fingerprint.
        
        Parameters:
        - transport: SSL transport
        
        Raises:
        ServerFingerprintMismatch: If fingerprint doesn't match
        """

class RequestInfo:
    def __init__(self, url, method, headers, real_url=None):
        """
        Request information container.
        
        Parameters:
        - url: Request URL
        - method (str): HTTP method
        - headers: Request headers  
        - real_url: Real URL after redirects
        """
    
    @property
    def url(self):
        """Request URL."""
    
    @property
    def method(self):
        """HTTP method."""
    
    @property
    def headers(self):
        """Request headers."""
    
    @property
    def real_url(self):
        """Real URL after redirects."""

Compression Configuration

Compression backend configuration for optimized data processing.

def set_zlib_backend(backend):
    """
    Configure zlib backend for compression.
    
    Parameters:
    - backend (str): Backend name ('zlib' or 'brotli')
    """

Gunicorn Worker Integration

Gunicorn worker classes for deploying aiohttp applications in production environments.

class GunicornWebWorker:
    """Gunicorn worker for aiohttp applications."""
    
    def __init__(self, age, ppid, sockets, app, log, cfg):
        """
        Initialize Gunicorn worker.
        
        Parameters:
        - age: Worker age
        - ppid: Parent process ID
        - sockets: Server sockets
        - app: WSGI application
        - log: Logger instance
        - cfg: Gunicorn configuration
        """
    
    def init_process(self):
        """Initialize worker process."""
    
    def run(self):
        """Run worker main loop."""

class GunicornUVLoopWebWorker(GunicornWebWorker):
    """Gunicorn worker with uvloop for enhanced performance."""
    
    def init_process(self):
        """Initialize worker with uvloop."""

HTTP Headers Module

Complete set of HTTP header constants and utilities.

# HTTP method constants
METH_ANY = '*'
METH_CONNECT = 'CONNECT'
METH_HEAD = 'HEAD'
METH_GET = 'GET'
METH_DELETE = 'DELETE'
METH_OPTIONS = 'OPTIONS'
METH_PATCH = 'PATCH'
METH_POST = 'POST'
METH_PUT = 'PUT'
METH_TRACE = 'TRACE'

# Set of all HTTP methods
METH_ALL = frozenset([
    METH_CONNECT, METH_HEAD, METH_GET, METH_DELETE,
    METH_OPTIONS, METH_PATCH, METH_POST, METH_PUT, METH_TRACE
])

# HTTP header constants (istr objects for case-insensitive comparison)
ACCEPT = 'Accept'
ACCEPT_CHARSET = 'Accept-Charset'
ACCEPT_ENCODING = 'Accept-Encoding'
ACCEPT_LANGUAGE = 'Accept-Language'
ACCEPT_RANGES = 'Accept-Ranges'
ACCESS_CONTROL_ALLOW_CREDENTIALS = 'Access-Control-Allow-Credentials'
ACCESS_CONTROL_ALLOW_HEADERS = 'Access-Control-Allow-Headers'
ACCESS_CONTROL_ALLOW_METHODS = 'Access-Control-Allow-Methods'
ACCESS_CONTROL_ALLOW_ORIGIN = 'Access-Control-Allow-Origin'
ACCESS_CONTROL_EXPOSE_HEADERS = 'Access-Control-Expose-Headers'
ACCESS_CONTROL_MAX_AGE = 'Access-Control-Max-Age'
ACCESS_CONTROL_REQUEST_HEADERS = 'Access-Control-Request-Headers'
ACCESS_CONTROL_REQUEST_METHOD = 'Access-Control-Request-Method'
AGE = 'Age'
ALLOW = 'Allow'
AUTHORIZATION = 'Authorization'
CACHE_CONTROL = 'Cache-Control'
CONNECTION = 'Connection'
CONTENT_DISPOSITION = 'Content-Disposition'
CONTENT_ENCODING = 'Content-Encoding'
CONTENT_LANGUAGE = 'Content-Language'
CONTENT_LENGTH = 'Content-Length'
CONTENT_LOCATION = 'Content-Location'
CONTENT_MD5 = 'Content-MD5'
CONTENT_RANGE = 'Content-Range'
CONTENT_TRANSFER_ENCODING = 'Content-Transfer-Encoding'
CONTENT_TYPE = 'Content-Type'
COOKIE = 'Cookie'
DATE = 'Date'
DESTINATION = 'Destination'
DIGEST = 'Digest'
ETAG = 'ETag'
EXPECT = 'Expect'
EXPIRES = 'Expires'
FORWARDED = 'Forwarded'
FROM = 'From'
HOST = 'Host'
IF_MATCH = 'If-Match'
IF_MODIFIED_SINCE = 'If-Modified-Since'
IF_NONE_MATCH = 'If-None-Match'
IF_RANGE = 'If-Range'
IF_UNMODIFIED_SINCE = 'If-Unmodified-Since'
LAST_MODIFIED = 'Last-Modified'
LINK = 'Link'
LOCATION = 'Location'
MAX_FORWARDS = 'Max-Forwards'
ORIGIN = 'Origin'
PRAGMA = 'Pragma'
PROXY_AUTHENTICATE = 'Proxy-Authenticate'
PROXY_AUTHORIZATION = 'Proxy-Authorization'
RANGE = 'Range'
REFERER = 'Referer'
RETRY_AFTER = 'Retry-After'
SEC_WEBSOCKET_ACCEPT = 'Sec-WebSocket-Accept'
SEC_WEBSOCKET_KEY = 'Sec-WebSocket-Key'
SEC_WEBSOCKET_KEY1 = 'Sec-WebSocket-Key1'
SEC_WEBSOCKET_KEY2 = 'Sec-WebSocket-Key2'
SEC_WEBSOCKET_ORIGIN = 'Sec-WebSocket-Origin'
SEC_WEBSOCKET_PROTOCOL = 'Sec-WebSocket-Protocol'
SEC_WEBSOCKET_VERSION = 'Sec-WebSocket-Version'
SERVER = 'Server'
SET_COOKIE = 'Set-Cookie'
TE = 'TE'
TRAILER = 'Trailer'
TRANSFER_ENCODING = 'Transfer-Encoding'
UPGRADE = 'Upgrade'
URI = 'URI'
USER_AGENT = 'User-Agent'
VARY = 'Vary'
VIA = 'Via'
WARNING = 'Warning'
WWW_AUTHENTICATE = 'WWW-Authenticate'
X_FORWARDED_FOR = 'X-Forwarded-For'
X_FORWARDED_HOST = 'X-Forwarded-Host'
X_FORWARDED_PROTO = 'X-Forwarded-Proto'

Usage Examples

Request Tracing for Performance Monitoring

import aiohttp
import asyncio
import time

async def setup_tracing():
    """Setup comprehensive request tracing."""
    trace_config = aiohttp.TraceConfig()
    
    # Request lifecycle tracking
    async def on_request_start(session, context, params):
        context.start_time = time.time()
        print(f"Starting request: {params.method} {params.url}")
    
    async def on_request_end(session, context, params):
        duration = time.time() - context.start_time
        print(f"Request completed: {params.method} {params.url} "
              f"({params.response.status}) in {duration:.3f}s")
    
    async def on_request_exception(session, context, params):
        duration = time.time() - context.start_time
        print(f"Request failed: {params.method} {params.url} "
              f"after {duration:.3f}s - {params.exception}")
    
    # Connection tracking
    async def on_connection_create_start(session, context, params):
        print("Creating new connection...")
    
    async def on_connection_create_end(session, context, params):
        print("Connection created successfully")
    
    async def on_connection_reuseconn(session, context, params):
        print("Reusing existing connection")
    
    # DNS tracking  
    async def on_dns_resolvehost_start(session, context, params):
        print(f"Resolving DNS for {params.host}")
    
    async def on_dns_resolvehost_end(session, context, params):
        print(f"DNS resolved for {params.host}")
    
    async def on_dns_cache_hit(session, context, params):
        print(f"DNS cache hit for {params.host}")
    
    # Register callbacks
    trace_config.on_request_start(on_request_start)
    trace_config.on_request_end(on_request_end)
    trace_config.on_request_exception(on_request_exception)
    trace_config.on_connection_create_start(on_connection_create_start)
    trace_config.on_connection_create_end(on_connection_create_end)
    trace_config.on_connection_reuseconn(on_connection_reuseconn)
    trace_config.on_dns_resolvehost_start(on_dns_resolvehost_start)
    trace_config.on_dns_resolvehost_end(on_dns_resolvehost_end)
    trace_config.on_dns_cache_hit(on_dns_cache_hit)
    
    return trace_config

async def traced_requests():
    """Make requests with tracing enabled."""
    trace_config = await setup_tracing()
    
    async with aiohttp.ClientSession(trace_configs=[trace_config]) as session:
        # Make multiple requests to see tracing in action
        urls = [
            'https://httpbin.org/delay/1',
            'https://httpbin.org/json',
            'https://httpbin.org/status/200',
            'https://httpbin.org/delay/1'  # This should reuse connection
        ]
        
        for url in urls:
            try:
                async with session.get(url) as response:
                    await response.read()
            except Exception as e:
                print(f"Request error: {e}")

# Run traced requests
asyncio.run(traced_requests())

Custom DNS Resolver

import aiohttp
import asyncio
import socket

class CustomResolver(aiohttp.AsyncResolver):
    """Custom DNS resolver with local overrides."""
    
    def __init__(self, overrides=None, loop=None):
        super().__init__(loop=loop)
        self.overrides = overrides or {}
    
    async def resolve(self, hostname, port=0, family=socket.AF_UNSPEC):
        """Resolve with local overrides."""
        if hostname in self.overrides:
            # Use override address
            override_addr = self.overrides[hostname]
            return [{
                'hostname': hostname,
                'host': override_addr,
                'port': port,
                'family': socket.AF_INET,
                'proto': 0,
                'flags': 0
            }]
        
        # Use default resolution
        return await super().resolve(hostname, port, family)

async def use_custom_resolver():
    """Use custom DNS resolver."""
    # Map hostnames to custom addresses
    resolver = CustomResolver(overrides={
        'api.local': '127.0.0.1',
        'dev.example.com': '192.168.1.100'
    })
    
    connector = aiohttp.TCPConnector(resolver=resolver)
    
    async with aiohttp.ClientSession(connector=connector) as session:
        # This will resolve api.local to 127.0.0.1
        try:
            async with session.get('http://api.local:8080/test') as response:
                return await response.text()
        except aiohttp.ClientError as e:
            print(f"Request failed: {e}")

asyncio.run(use_custom_resolver())

Helper Utilities Usage

from aiohttp.helpers import ChainMapProxy, ETag, BasicAuth
import aiohttp

def demonstrate_helpers():
    """Demonstrate utility helper classes."""
    
    # ChainMapProxy for nested configuration
    default_config = {'timeout': 30, 'retries': 3}
    user_config = {'timeout': 60}
    runtime_config = {'debug': True}
    
    config = ChainMapProxy([runtime_config, user_config, default_config])
    
    print(f"Timeout: {config['timeout']}")  # 60 (from user_config)
    print(f"Retries: {config['retries']}")  # 3 (from default_config)
    print(f"Debug: {config['debug']}")      # True (from runtime_config)
    
    # ETag handling
    etag = ETag('abc123', is_weak=False)
    print(f"ETag: {etag}")  # "abc123"
    
    weak_etag = ETag('def456', is_weak=True)
    print(f"Weak ETag: {weak_etag}")  # W/"def456"
    
    # Basic authentication
    auth = BasicAuth('username', 'password')
    print(f"Auth login: {auth.login}")
    print(f"Auth password: {auth.password}")

demonstrate_helpers()

Production Deployment with Gunicorn

# gunicorn_config.py - Gunicorn configuration file
import multiprocessing

# Server socket
bind = "0.0.0.0:8000"
backlog = 2048

# Worker processes
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "aiohttp.GunicornWebWorker"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 50
timeout = 30
keepalive = 2

# Logging
accesslog = "-"
errorlog = "-"
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'

# Process naming
proc_name = 'aiohttp_app'

# Server mechanics
daemon = False
pidfile = '/tmp/aiohttp_app.pid'
user = None
group = None
tmp_upload_dir = None

# SSL (if needed)
# keyfile = '/path/to/keyfile'
# certfile = '/path/to/certfile'
# app.py - Application setup for Gunicorn
from aiohttp import web
import os
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def health_check(request):
    """Health check endpoint."""
    return web.json_response({'status': 'healthy'})

async def api_endpoint(request):
    """Sample API endpoint."""
    return web.json_response({
        'message': 'Hello from aiohttp!',
        'worker_pid': os.getpid()
    })

def create_app():
    """Create and configure the application."""
    app = web.Application()
    
    # Add routes
    app.router.add_get('/health', health_check)
    app.router.add_get('/api/hello', api_endpoint)
    
    # Add middleware for logging
    @web.middleware
    async def logging_middleware(request, handler):
        start_time = time.time()
        response = await handler(request)
        process_time = time.time() - start_time
        logger.info(f"{request.method} {request.path} - {response.status} - {process_time:.3f}s")
        return response
    
    app.middlewares.append(logging_middleware)
    
    # Startup/shutdown hooks
    async def init_app(app):
        logger.info("Application starting up...")
    
    async def cleanup_app(app):
        logger.info("Application shutting down...")
    
    app.on_startup.append(init_app)
    app.on_cleanup.append(cleanup_app)
    
    return app

# For Gunicorn
app = create_app()

# For development
if __name__ == '__main__':
    web.run_app(app, host='localhost', port=8000)
# Deploy with Gunicorn
gunicorn app:app -c gunicorn_config.py

# Or with uvloop for better performance
pip install uvloop
gunicorn app:app -c gunicorn_config.py --worker-class aiohttp.GunicornUVLoopWebWorker

Compression Configuration

import aiohttp

# Configure compression backend
aiohttp.set_zlib_backend('zlib')  # Use standard zlib
# aiohttp.set_zlib_backend('brotli')  # Use Brotli compression

async def compressed_request():
    """Make request with compression support."""
    headers = {
        'Accept-Encoding': 'gzip, deflate, br'  # Support multiple encodings
    }
    
    async with aiohttp.ClientSession() as session:
        async with session.get('https://httpbin.org/gzip', 
                              headers=headers) as response:
            # Response is automatically decompressed
            data = await response.json()
            return data

Install with Tessl CLI

npx tessl i tessl/pypi-aiohttp

docs

client.md

data.md

index.md

server.md

testing.md

utilities.md

websocket.md

tile.json