Async http client/server framework (asyncio)
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Supporting utilities and extensions including request tracing, DNS resolution, authentication helpers, compression configuration, and Gunicorn worker integration. These utilities provide integration points, performance optimization, and extensibility features for aiohttp applications.
Comprehensive request tracing system for monitoring and debugging HTTP client operations with detailed event tracking.
class TraceConfig:
def __init__(self):
"""Create trace configuration."""
def on_request_start(self, callback):
"""
Register request start callback.
Parameters:
- callback: Async function receiving TraceRequestStartParams
"""
def on_request_end(self, callback):
"""
Register request end callback.
Parameters:
- callback: Async function receiving TraceRequestEndParams
"""
def on_request_exception(self, callback):
"""
Register request exception callback.
Parameters:
- callback: Async function receiving TraceRequestExceptionParams
"""
def on_request_redirect(self, callback):
"""
Register request redirect callback.
Parameters:
- callback: Async function receiving TraceRequestRedirectParams
"""
def on_request_headers_sent(self, callback):
"""
Register request headers sent callback.
Parameters:
- callback: Async function receiving TraceRequestHeadersSentParams
"""
def on_request_chunk_sent(self, callback):
"""
Register request chunk sent callback.
Parameters:
- callback: Async function receiving TraceRequestChunkSentParams
"""
def on_response_chunk_received(self, callback):
"""
Register response chunk received callback.
Parameters:
- callback: Async function receiving TraceResponseChunkReceivedParams
"""
def on_connection_create_start(self, callback):
"""
Register connection creation start callback.
Parameters:
- callback: Async function receiving TraceConnectionCreateStartParams
"""
def on_connection_create_end(self, callback):
"""
Register connection creation end callback.
Parameters:
- callback: Async function receiving TraceConnectionCreateEndParams
"""
def on_connection_queued_start(self, callback):
"""
Register connection queued start callback.
Parameters:
- callback: Async function receiving TraceConnectionQueuedStartParams
"""
def on_connection_queued_end(self, callback):
"""
Register connection queued end callback.
Parameters:
- callback: Async function receiving TraceConnectionQueuedEndParams
"""
def on_connection_reuseconn(self, callback):
"""
Register connection reuse callback.
Parameters:
- callback: Async function receiving TraceConnectionReuseconnParams
"""
def on_dns_resolvehost_start(self, callback):
"""
Register DNS resolve start callback.
Parameters:
- callback: Async function receiving TraceDnsResolveHostStartParams
"""
def on_dns_resolvehost_end(self, callback):
"""
Register DNS resolve end callback.
Parameters:
- callback: Async function receiving TraceDnsResolveHostEndParams
"""
def on_dns_cache_hit(self, callback):
"""
Register DNS cache hit callback.
Parameters:
- callback: Async function receiving TraceDnsCacheHitParams
"""
def on_dns_cache_miss(self, callback):
"""
Register DNS cache miss callback.
Parameters:
- callback: Async function receiving TraceDnsCacheMissParams
"""
# Trace event parameter classes
class TraceRequestStartParams:
"""Request start trace parameters."""
@property
def method(self): ...
@property
def url(self): ...
@property
def headers(self): ...
class TraceRequestEndParams:
"""Request end trace parameters."""
@property
def method(self): ...
@property
def url(self): ...
@property
def headers(self): ...
@property
def response(self): ...
class TraceRequestExceptionParams:
"""Request exception trace parameters."""
@property
def method(self): ...
@property
def url(self): ...
@property
def headers(self): ...
@property
def exception(self): ...
class TraceRequestRedirectParams:
"""Request redirect trace parameters."""
@property
def method(self): ...
@property
def url(self): ...
@property
def headers(self): ...
@property
def response(self): ...
class TraceRequestHeadersSentParams:
"""Request headers sent trace parameters."""
@property
def method(self): ...
@property
def url(self): ...
@property
def headers(self): ...
class TraceRequestChunkSentParams:
"""Request chunk sent trace parameters."""
@property
def method(self): ...
@property
def url(self): ...
@property
def chunk(self): ...
class TraceResponseChunkReceivedParams:
"""Response chunk received trace parameters."""
@property
def method(self): ...
@property
def url(self): ...
@property
def chunk(self): ...
class TraceConnectionCreateStartParams:
"""Connection creation start trace parameters."""
pass
class TraceConnectionCreateEndParams:
"""Connection creation end trace parameters."""
pass
class TraceConnectionQueuedStartParams:
"""Connection queued start trace parameters."""
pass
class TraceConnectionQueuedEndParams:
"""Connection queued end trace parameters."""
pass
class TraceConnectionReuseconnParams:
"""Connection reuse trace parameters."""
pass
class TraceDnsResolveHostStartParams:
"""DNS resolve start trace parameters."""
@property
def host(self): ...
class TraceDnsResolveHostEndParams:
"""DNS resolve end trace parameters."""
@property
def host(self): ...
class TraceDnsCacheHitParams:
"""DNS cache hit trace parameters."""
@property
def host(self): ...
class TraceDnsCacheMissParams:
"""DNS cache miss trace parameters."""
@property
def host(self): ...Configurable DNS resolution with support for different resolver implementations and caching strategies.
class AsyncResolver:
def __init__(self, loop=None):
"""
Asynchronous DNS resolver.
Parameters:
- loop: Event loop
"""
async def resolve(self, hostname, port=0, family=0):
"""
Resolve hostname to addresses.
Parameters:
- hostname (str): Hostname to resolve
- port (int): Port number
- family (int): Address family
Returns:
List of resolved addresses
"""
async def close(self):
"""Close resolver resources."""
class DefaultResolver:
def __init__(self, loop=None):
"""
Default system DNS resolver.
Parameters:
- loop: Event loop
"""
async def resolve(self, hostname, port=0, family=0):
"""Resolve hostname using system resolver."""
async def close(self):
"""Close resolver resources."""
class ThreadedResolver:
def __init__(self, loop=None):
"""
Thread-based DNS resolver.
Parameters:
- loop: Event loop
"""
async def resolve(self, hostname, port=0, family=0):
"""Resolve hostname in thread pool."""
async def close(self):
"""Close resolver resources."""Utility classes and functions for common operations and data structures.
class ChainMapProxy:
def __init__(self, maps):
"""
Chain map proxy for nested dictionaries.
Parameters:
- maps: Sequence of dictionaries to chain
"""
def __getitem__(self, key):
"""Get item from chained maps."""
def __contains__(self, key):
"""Check if key exists in any map."""
def get(self, key, default=None):
"""Get item with default value."""
def keys(self):
"""Get all keys from chained maps."""
def values(self):
"""Get all values from chained maps."""
def items(self):
"""Get all items from chained maps."""
class ETag:
def __init__(self, value, is_weak=False):
"""
ETag header handling.
Parameters:
- value (str): ETag value
- is_weak (bool): Whether ETag is weak
"""
@property
def value(self):
"""ETag value."""
@property
def is_weak(self):
"""Whether ETag is weak."""
def __str__(self):
"""String representation of ETag."""
class Fingerprint:
def __init__(self, fingerprint):
"""
SSL certificate fingerprint handling.
Parameters:
- fingerprint (bytes): Certificate fingerprint
"""
def check(self, transport):
"""
Verify certificate fingerprint.
Parameters:
- transport: SSL transport
Raises:
ServerFingerprintMismatch: If fingerprint doesn't match
"""
class RequestInfo:
def __init__(self, url, method, headers, real_url=None):
"""
Request information container.
Parameters:
- url: Request URL
- method (str): HTTP method
- headers: Request headers
- real_url: Real URL after redirects
"""
@property
def url(self):
"""Request URL."""
@property
def method(self):
"""HTTP method."""
@property
def headers(self):
"""Request headers."""
@property
def real_url(self):
"""Real URL after redirects."""Compression backend configuration for optimized data processing.
def set_zlib_backend(backend):
"""
Configure zlib backend for compression.
Parameters:
- backend (str): Backend name ('zlib' or 'brotli')
"""Gunicorn worker classes for deploying aiohttp applications in production environments.
class GunicornWebWorker:
"""Gunicorn worker for aiohttp applications."""
def __init__(self, age, ppid, sockets, app, log, cfg):
"""
Initialize Gunicorn worker.
Parameters:
- age: Worker age
- ppid: Parent process ID
- sockets: Server sockets
- app: WSGI application
- log: Logger instance
- cfg: Gunicorn configuration
"""
def init_process(self):
"""Initialize worker process."""
def run(self):
"""Run worker main loop."""
class GunicornUVLoopWebWorker(GunicornWebWorker):
"""Gunicorn worker with uvloop for enhanced performance."""
def init_process(self):
"""Initialize worker with uvloop."""Complete set of HTTP header constants and utilities.
# HTTP method constants
METH_ANY = '*'
METH_CONNECT = 'CONNECT'
METH_HEAD = 'HEAD'
METH_GET = 'GET'
METH_DELETE = 'DELETE'
METH_OPTIONS = 'OPTIONS'
METH_PATCH = 'PATCH'
METH_POST = 'POST'
METH_PUT = 'PUT'
METH_TRACE = 'TRACE'
# Set of all HTTP methods
METH_ALL = frozenset([
METH_CONNECT, METH_HEAD, METH_GET, METH_DELETE,
METH_OPTIONS, METH_PATCH, METH_POST, METH_PUT, METH_TRACE
])
# HTTP header constants (istr objects for case-insensitive comparison)
ACCEPT = 'Accept'
ACCEPT_CHARSET = 'Accept-Charset'
ACCEPT_ENCODING = 'Accept-Encoding'
ACCEPT_LANGUAGE = 'Accept-Language'
ACCEPT_RANGES = 'Accept-Ranges'
ACCESS_CONTROL_ALLOW_CREDENTIALS = 'Access-Control-Allow-Credentials'
ACCESS_CONTROL_ALLOW_HEADERS = 'Access-Control-Allow-Headers'
ACCESS_CONTROL_ALLOW_METHODS = 'Access-Control-Allow-Methods'
ACCESS_CONTROL_ALLOW_ORIGIN = 'Access-Control-Allow-Origin'
ACCESS_CONTROL_EXPOSE_HEADERS = 'Access-Control-Expose-Headers'
ACCESS_CONTROL_MAX_AGE = 'Access-Control-Max-Age'
ACCESS_CONTROL_REQUEST_HEADERS = 'Access-Control-Request-Headers'
ACCESS_CONTROL_REQUEST_METHOD = 'Access-Control-Request-Method'
AGE = 'Age'
ALLOW = 'Allow'
AUTHORIZATION = 'Authorization'
CACHE_CONTROL = 'Cache-Control'
CONNECTION = 'Connection'
CONTENT_DISPOSITION = 'Content-Disposition'
CONTENT_ENCODING = 'Content-Encoding'
CONTENT_LANGUAGE = 'Content-Language'
CONTENT_LENGTH = 'Content-Length'
CONTENT_LOCATION = 'Content-Location'
CONTENT_MD5 = 'Content-MD5'
CONTENT_RANGE = 'Content-Range'
CONTENT_TRANSFER_ENCODING = 'Content-Transfer-Encoding'
CONTENT_TYPE = 'Content-Type'
COOKIE = 'Cookie'
DATE = 'Date'
DESTINATION = 'Destination'
DIGEST = 'Digest'
ETAG = 'ETag'
EXPECT = 'Expect'
EXPIRES = 'Expires'
FORWARDED = 'Forwarded'
FROM = 'From'
HOST = 'Host'
IF_MATCH = 'If-Match'
IF_MODIFIED_SINCE = 'If-Modified-Since'
IF_NONE_MATCH = 'If-None-Match'
IF_RANGE = 'If-Range'
IF_UNMODIFIED_SINCE = 'If-Unmodified-Since'
LAST_MODIFIED = 'Last-Modified'
LINK = 'Link'
LOCATION = 'Location'
MAX_FORWARDS = 'Max-Forwards'
ORIGIN = 'Origin'
PRAGMA = 'Pragma'
PROXY_AUTHENTICATE = 'Proxy-Authenticate'
PROXY_AUTHORIZATION = 'Proxy-Authorization'
RANGE = 'Range'
REFERER = 'Referer'
RETRY_AFTER = 'Retry-After'
SEC_WEBSOCKET_ACCEPT = 'Sec-WebSocket-Accept'
SEC_WEBSOCKET_KEY = 'Sec-WebSocket-Key'
SEC_WEBSOCKET_KEY1 = 'Sec-WebSocket-Key1'
SEC_WEBSOCKET_KEY2 = 'Sec-WebSocket-Key2'
SEC_WEBSOCKET_ORIGIN = 'Sec-WebSocket-Origin'
SEC_WEBSOCKET_PROTOCOL = 'Sec-WebSocket-Protocol'
SEC_WEBSOCKET_VERSION = 'Sec-WebSocket-Version'
SERVER = 'Server'
SET_COOKIE = 'Set-Cookie'
TE = 'TE'
TRAILER = 'Trailer'
TRANSFER_ENCODING = 'Transfer-Encoding'
UPGRADE = 'Upgrade'
URI = 'URI'
USER_AGENT = 'User-Agent'
VARY = 'Vary'
VIA = 'Via'
WARNING = 'Warning'
WWW_AUTHENTICATE = 'WWW-Authenticate'
X_FORWARDED_FOR = 'X-Forwarded-For'
X_FORWARDED_HOST = 'X-Forwarded-Host'
X_FORWARDED_PROTO = 'X-Forwarded-Proto'import aiohttp
import asyncio
import time
async def setup_tracing():
"""Setup comprehensive request tracing."""
trace_config = aiohttp.TraceConfig()
# Request lifecycle tracking
async def on_request_start(session, context, params):
context.start_time = time.time()
print(f"Starting request: {params.method} {params.url}")
async def on_request_end(session, context, params):
duration = time.time() - context.start_time
print(f"Request completed: {params.method} {params.url} "
f"({params.response.status}) in {duration:.3f}s")
async def on_request_exception(session, context, params):
duration = time.time() - context.start_time
print(f"Request failed: {params.method} {params.url} "
f"after {duration:.3f}s - {params.exception}")
# Connection tracking
async def on_connection_create_start(session, context, params):
print("Creating new connection...")
async def on_connection_create_end(session, context, params):
print("Connection created successfully")
async def on_connection_reuseconn(session, context, params):
print("Reusing existing connection")
# DNS tracking
async def on_dns_resolvehost_start(session, context, params):
print(f"Resolving DNS for {params.host}")
async def on_dns_resolvehost_end(session, context, params):
print(f"DNS resolved for {params.host}")
async def on_dns_cache_hit(session, context, params):
print(f"DNS cache hit for {params.host}")
# Register callbacks
trace_config.on_request_start(on_request_start)
trace_config.on_request_end(on_request_end)
trace_config.on_request_exception(on_request_exception)
trace_config.on_connection_create_start(on_connection_create_start)
trace_config.on_connection_create_end(on_connection_create_end)
trace_config.on_connection_reuseconn(on_connection_reuseconn)
trace_config.on_dns_resolvehost_start(on_dns_resolvehost_start)
trace_config.on_dns_resolvehost_end(on_dns_resolvehost_end)
trace_config.on_dns_cache_hit(on_dns_cache_hit)
return trace_config
async def traced_requests():
"""Make requests with tracing enabled."""
trace_config = await setup_tracing()
async with aiohttp.ClientSession(trace_configs=[trace_config]) as session:
# Make multiple requests to see tracing in action
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/json',
'https://httpbin.org/status/200',
'https://httpbin.org/delay/1' # This should reuse connection
]
for url in urls:
try:
async with session.get(url) as response:
await response.read()
except Exception as e:
print(f"Request error: {e}")
# Run traced requests
asyncio.run(traced_requests())import aiohttp
import asyncio
import socket
class CustomResolver(aiohttp.AsyncResolver):
"""Custom DNS resolver with local overrides."""
def __init__(self, overrides=None, loop=None):
super().__init__(loop=loop)
self.overrides = overrides or {}
async def resolve(self, hostname, port=0, family=socket.AF_UNSPEC):
"""Resolve with local overrides."""
if hostname in self.overrides:
# Use override address
override_addr = self.overrides[hostname]
return [{
'hostname': hostname,
'host': override_addr,
'port': port,
'family': socket.AF_INET,
'proto': 0,
'flags': 0
}]
# Use default resolution
return await super().resolve(hostname, port, family)
async def use_custom_resolver():
"""Use custom DNS resolver."""
# Map hostnames to custom addresses
resolver = CustomResolver(overrides={
'api.local': '127.0.0.1',
'dev.example.com': '192.168.1.100'
})
connector = aiohttp.TCPConnector(resolver=resolver)
async with aiohttp.ClientSession(connector=connector) as session:
# This will resolve api.local to 127.0.0.1
try:
async with session.get('http://api.local:8080/test') as response:
return await response.text()
except aiohttp.ClientError as e:
print(f"Request failed: {e}")
asyncio.run(use_custom_resolver())from aiohttp.helpers import ChainMapProxy, ETag, BasicAuth
import aiohttp
def demonstrate_helpers():
"""Demonstrate utility helper classes."""
# ChainMapProxy for nested configuration
default_config = {'timeout': 30, 'retries': 3}
user_config = {'timeout': 60}
runtime_config = {'debug': True}
config = ChainMapProxy([runtime_config, user_config, default_config])
print(f"Timeout: {config['timeout']}") # 60 (from user_config)
print(f"Retries: {config['retries']}") # 3 (from default_config)
print(f"Debug: {config['debug']}") # True (from runtime_config)
# ETag handling
etag = ETag('abc123', is_weak=False)
print(f"ETag: {etag}") # "abc123"
weak_etag = ETag('def456', is_weak=True)
print(f"Weak ETag: {weak_etag}") # W/"def456"
# Basic authentication
auth = BasicAuth('username', 'password')
print(f"Auth login: {auth.login}")
print(f"Auth password: {auth.password}")
demonstrate_helpers()# gunicorn_config.py - Gunicorn configuration file
import multiprocessing
# Server socket
bind = "0.0.0.0:8000"
backlog = 2048
# Worker processes
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "aiohttp.GunicornWebWorker"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 50
timeout = 30
keepalive = 2
# Logging
accesslog = "-"
errorlog = "-"
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
# Process naming
proc_name = 'aiohttp_app'
# Server mechanics
daemon = False
pidfile = '/tmp/aiohttp_app.pid'
user = None
group = None
tmp_upload_dir = None
# SSL (if needed)
# keyfile = '/path/to/keyfile'
# certfile = '/path/to/certfile'# app.py - Application setup for Gunicorn
from aiohttp import web
import os
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def health_check(request):
"""Health check endpoint."""
return web.json_response({'status': 'healthy'})
async def api_endpoint(request):
"""Sample API endpoint."""
return web.json_response({
'message': 'Hello from aiohttp!',
'worker_pid': os.getpid()
})
def create_app():
"""Create and configure the application."""
app = web.Application()
# Add routes
app.router.add_get('/health', health_check)
app.router.add_get('/api/hello', api_endpoint)
# Add middleware for logging
@web.middleware
async def logging_middleware(request, handler):
start_time = time.time()
response = await handler(request)
process_time = time.time() - start_time
logger.info(f"{request.method} {request.path} - {response.status} - {process_time:.3f}s")
return response
app.middlewares.append(logging_middleware)
# Startup/shutdown hooks
async def init_app(app):
logger.info("Application starting up...")
async def cleanup_app(app):
logger.info("Application shutting down...")
app.on_startup.append(init_app)
app.on_cleanup.append(cleanup_app)
return app
# For Gunicorn
app = create_app()
# For development
if __name__ == '__main__':
web.run_app(app, host='localhost', port=8000)# Deploy with Gunicorn
gunicorn app:app -c gunicorn_config.py
# Or with uvloop for better performance
pip install uvloop
gunicorn app:app -c gunicorn_config.py --worker-class aiohttp.GunicornUVLoopWebWorkerimport aiohttp
# Configure compression backend
aiohttp.set_zlib_backend('zlib') # Use standard zlib
# aiohttp.set_zlib_backend('brotli') # Use Brotli compression
async def compressed_request():
"""Make request with compression support."""
headers = {
'Accept-Encoding': 'gzip, deflate, br' # Support multiple encodings
}
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/gzip',
headers=headers) as response:
# Response is automatically decompressed
data = await response.json()
return dataInstall with Tessl CLI
npx tessl i tessl/pypi-aiohttp